dimas-b commented on code in PR #3803: URL: https://github.com/apache/polaris/pull/3803#discussion_r2831219816
########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyMaintenance.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.vertx.core.Vertx; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.service.context.RealmContextConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Background maintenance for idempotency (purge of expired keys). */ +@ApplicationScoped +public class IdempotencyMaintenance { + + private static final Logger LOGGER = LoggerFactory.getLogger(IdempotencyMaintenance.class); + + @Inject IdempotencyConfiguration configuration; + @Inject RealmContextConfiguration realmContextConfiguration; + @Inject IdempotencyStore store; + @Inject Clock clock; + @Inject Vertx vertx; + + private volatile Long purgeTimerId; + + void onStart(@Observes StartupEvent event) { + if (!configuration.enabled() || !configuration.purgeEnabled()) { + return; + } + long intervalMs = Math.max(1000L, configuration.purgeIntervalSeconds() * 1000L); + purgeTimerId = vertx.setPeriodic(intervalMs, ignored -> purgeOnce()); Review Comment: Will this run `purgeOnce()` on a Vert.x thread (not good for blocking tasks) or on an executor thread? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyFilter.java: ########## @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import jakarta.inject.Inject; +import jakarta.ws.rs.Priorities; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerResponseContext; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import java.lang.reflect.Method; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.resolver.Resolver; +import org.apache.polaris.core.persistence.resolver.ResolverFactory; +import org.apache.polaris.core.persistence.resolver.ResolverStatus; +import org.apache.polaris.service.catalog.CatalogPrefixParser; +import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter; +import org.apache.polaris.service.context.RealmContextFilter; +import org.jboss.resteasy.reactive.server.ServerRequestFilter; +import org.jboss.resteasy.reactive.server.ServerResponseFilter; +import org.jboss.resteasy.reactive.server.core.CurrentRequestManager; +import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; +import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo; + +/** + * HTTP idempotency integration at the request/response layer. + * + * <p>For in-scope requests (see {@link IdempotencyConfiguration#scopes()}), this filter reserves an + * idempotency key before executing the request, and replays the previously finalized response when + * a duplicate key is received. For owned requests, it finalizes the response summary/headers for + * future replay. + */ +public class IdempotencyFilter { + + private static final String PROP_IDEMPOTENCY_KEY = "idempotency.key"; + private static final String PROP_IDEMPOTENCY_OPERATION = "idempotency.operation"; + private static final String PROP_IDEMPOTENCY_RESOURCE = "idempotency.resource"; + private static final String PROP_IDEMPOTENCY_OWNED = "idempotency.owned"; + private static final String PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID = "idempotency.heartbeat.timerId"; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStore store; + @Inject Clock clock; + @Inject ObjectMapper objectMapper; + @Inject Vertx vertx; + @Inject ResolverFactory resolverFactory; + @Inject CatalogPrefixParser prefixParser; + + @ServerRequestFilter(priority = Priorities.AUTHORIZATION + 10) + public Uni<Response> reserveOrReplay(ContainerRequestContext rc) { + if (!configuration.enabled()) { + return Uni.createFrom().nullItem(); + } + + SecurityContext securityContext = rc.getSecurityContext(); + + String rawKey = rc.getHeaderString(configuration.keyHeader()); + if (rawKey == null || rawKey.isBlank()) { + return Uni.createFrom().nullItem(); + } + String key = normalizeIdempotencyKey(rawKey); + if (key == null) { + return Uni.createFrom() + .item( + error( + 400, + "idempotency_key_invalid", + "Idempotency-Key must be a UUIDv7 string (RFC 9562)")); + } + + boolean scopedMode = !configuration.scopes().isEmpty(); + if (!scopedMode) { + // Without an explicit scope allowlist, only apply idempotency to Iceberg REST catalog + // mutating endpoints under /v1/{prefix}/... + if (!isMutatingMethod(rc.getMethod()) || !isIcebergMutationEndpoint(rc)) { + return Uni.createFrom().nullItem(); + } + } + + RealmContext realmContext = (RealmContext) rc.getProperty(RealmContextFilter.REALM_CONTEXT_KEY); + if (realmContext == null) { + // RealmContextFilter should run before this; treat missing realm as a server error. + return Uni.createFrom().item(error(500, "MissingRealmContext", "Missing realm context")); + } + + // If scopes are configured, apply idempotency only to matching endpoints and use the configured + // operationType for stable binding. + IdempotencyConfiguration.Scope scope = matchScope(rc); + if (scope == null && scopedMode) { + return Uni.createFrom().nullItem(); + } + + PolarisPrincipal principal = + securityContext != null && securityContext.getUserPrincipal() instanceof PolarisPrincipal p + ? p + : null; + if (principal != null && !isPolarisManagedInternalIcebergCatalogRequest(rc, principal)) { + // for federated/external catalogs, Polaris may not enforce idempotency end-to-end, + // so treat Idempotency-Key as a no-op (do not reserve/finalize/replay). + return Uni.createFrom().nullItem(); + } + + String realmId = realmContext.getRealmIdentifier(); + String operationType = scope == null ? normalizeOperationType(rc) : scope.operationType(); + String resourceId = normalizeResourceId(rc, scope != null); + + Instant now = clock.instant(); + Instant expiresAt = + now.plusSeconds(Math.max(0L, configuration.ttlSeconds())) + .plusSeconds(Math.max(0L, configuration.ttlGraceSeconds())); + + final IdempotencyStore.ReserveResult r; + try { + r = + store.reserve( Review Comment: It is an anti-pattern to call a potentially blocking method (I/O) from an async filter. ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyFilter.java: ########## @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import jakarta.inject.Inject; +import jakarta.ws.rs.Priorities; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerResponseContext; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import java.lang.reflect.Method; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.CatalogEntity; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.resolver.Resolver; +import org.apache.polaris.core.persistence.resolver.ResolverFactory; +import org.apache.polaris.core.persistence.resolver.ResolverStatus; +import org.apache.polaris.service.catalog.CatalogPrefixParser; +import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter; +import org.apache.polaris.service.context.RealmContextFilter; +import org.jboss.resteasy.reactive.server.ServerRequestFilter; +import org.jboss.resteasy.reactive.server.ServerResponseFilter; +import org.jboss.resteasy.reactive.server.core.CurrentRequestManager; +import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; +import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo; + +/** + * HTTP idempotency integration at the request/response layer. + * + * <p>For in-scope requests (see {@link IdempotencyConfiguration#scopes()}), this filter reserves an + * idempotency key before executing the request, and replays the previously finalized response when + * a duplicate key is received. For owned requests, it finalizes the response summary/headers for + * future replay. + */ +public class IdempotencyFilter { + + private static final String PROP_IDEMPOTENCY_KEY = "idempotency.key"; + private static final String PROP_IDEMPOTENCY_OPERATION = "idempotency.operation"; + private static final String PROP_IDEMPOTENCY_RESOURCE = "idempotency.resource"; + private static final String PROP_IDEMPOTENCY_OWNED = "idempotency.owned"; + private static final String PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID = "idempotency.heartbeat.timerId"; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStore store; + @Inject Clock clock; + @Inject ObjectMapper objectMapper; + @Inject Vertx vertx; + @Inject ResolverFactory resolverFactory; + @Inject CatalogPrefixParser prefixParser; + + @ServerRequestFilter(priority = Priorities.AUTHORIZATION + 10) + public Uni<Response> reserveOrReplay(ContainerRequestContext rc) { + if (!configuration.enabled()) { + return Uni.createFrom().nullItem(); Review Comment: Would it work if we had a CDI producer method conditionally emitting a bean with `@ServerRequestFilter`? The idea is to avoid having a filter at all if `configuration.enabled()` is `false`. ########## runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java: ########## @@ -217,6 +219,20 @@ public MetaStoreManagerFactory metaStoreManagerFactory( return metaStoreManagerFactories.select(Identifier.Literal.of(config.type())).get(); } + @Produces + @ApplicationScoped + public IdempotencyStoreFactory idempotencyStoreFactory( + PersistenceConfiguration config, + @Any Instance<IdempotencyStoreFactory> idempotencyFactories) { + return idempotencyFactories.select(Identifier.Literal.of(config.type())).get(); + } + + @Produces + @ApplicationScoped + public IdempotencyStore idempotencyStore(IdempotencyStoreFactory factory) { + return factory.create(); Review Comment: Do we really need this factory? The `create()` method does not take any parameters, so whatever is injected into the factory by CDI could also be injected into the `IdempotencyStore` implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
