huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2608646989
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -105,9 +111,145 @@ public class CatalogHandlers {
private static final InMemoryPlanningState IN_MEMORY_PLANNING_STATE =
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static final Set<String> SIMULATE_503_ON_FIRST_SUCCESS_KEYS =
Sets.newConcurrentHashSet();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
private CatalogHandlers() {}
+ /**
+ * Execute a mutation with basic idempotency semantics based on the
Idempotency-Key header.
+ *
+ * <p>This simple reference implementation stores the response in-memory
keyed by the header
+ * value. If the same key is seen again, the stored response is returned and
the action is not
+ * re-executed. This is suitable for tests and examples; production servers
should provide a
+ * durable store and TTL management.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ // check existing entry and TTL
+ IdempotencyEntry existing = IDEMPOTENCY_STORE.get(key);
+ if (existing != null) {
+ long now = System.currentTimeMillis();
+ boolean expired =
+ existing.status == IdempotencyEntry.Status.FINALIZED
+ && (now - existing.firstSeenMillis) > idempotencyLifetimeMillis;
Review Comment:
Fixed. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]