huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2658490590


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3312,6 +3319,160 @@ public void 
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
     local.dropTable(ident);
   }
 
+  @Test
+  public void testIdempotentDuplicateCreateReturnsCached() {
+    String key = "dup-create-key";
+    Namespace ns = Namespace.of("ns_dup");
+    TableIdentifier ident = TableIdentifier.of(ns, "t_dup");
+    restCatalog.createNamespace(ns, ImmutableMap.of());
+    Pair<RESTClient, Map<String, String>> httpAndHeaders = httpAndHeaders(key);
+    RESTClient http = httpAndHeaders.first();
+    Map<String, String> headers = httpAndHeaders.second();
+    CreateTableRequest req = createReq(ident);
+
+    // First create succeeds
+    LoadTableResponse first =
+        http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(first).isNotNull();
+
+    // Verify request shape (method, path, headers including Idempotency-Key)
+    verifyCreatePost(ns, headers);
+
+    // Duplicate with same key returns cached 200 OK
+    LoadTableResponse second =
+        http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(second).isNotNull();
+
+    // Clean up
+    restCatalog.dropTable(ident);
+  }
+
+  @Test
+  public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() {
+    // Set TTL to 0 so cached success expires immediately
+    CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S");
+    try {
+      String key = "expired-create-key";
+      Namespace ns = Namespace.of("ns_exp");
+      TableIdentifier ident = TableIdentifier.of(ns, "t_exp");
+      restCatalog.createNamespace(ns, ImmutableMap.of());
+      Pair<RESTClient, Map<String, String>> httpAndHeaders = 
httpAndHeaders(key);
+      RESTClient http = httpAndHeaders.first();
+      Map<String, String> headers = httpAndHeaders.second();
+      CreateTableRequest req = createReq(ident);
+
+      // First create succeeds
+      LoadTableResponse created =
+          http.post(
+              ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+              req,
+              LoadTableResponse.class,
+              headers,
+              ErrorHandlers.tableErrorHandler());
+      assertThat(created).isNotNull();
+
+      // Verify request shape (method, path, headers including Idempotency-Key)
+      verifyCreatePost(ns, headers);
+
+      // TTL expired -> duplicate with same key should be treated as new and 
fail with AlreadyExists
+      assertThatThrownBy(
+              () ->
+                  http.post(
+                      
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+                      req,
+                      LoadTableResponse.class,
+                      headers,
+                      ErrorHandlers.tableErrorHandler()))
+          .isInstanceOf(AlreadyExistsException.class)
+          .hasMessageContaining(ident.toString());
+
+      // Clean up
+      restCatalog.dropTable(ident);
+    } finally {
+      // Restore default TTL for other tests
+      CatalogHandlers.setIdempotencyLifetimeFromIso("PT30M");
+    }
+  }
+
+  @Test
+  public void testIdempotentCreateReplayAfterSimulated503() {
+    // Use a fixed key and simulate 503 after first success for that key
+    String key = "idemp-create-503";
+    adapterForRESTServer.simulate503OnFirstSuccessForKey(key);
+    Namespace ns = Namespace.of("ns_idemp");
+    TableIdentifier ident = TableIdentifier.of(ns, "t_idemp");
+    restCatalog.createNamespace(ns, ImmutableMap.of());
+    Pair<RESTClient, Map<String, String>> httpAndHeaders = httpAndHeaders(key);
+    RESTClient http = httpAndHeaders.first();
+    Map<String, String> headers = httpAndHeaders.second();
+    CreateTableRequest req = createReq(ident);
+
+    // First attempt: server finalizes success but responds 503
+    assertThatThrownBy(
+            () ->
+                http.post(
+                    
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+                    req,
+                    LoadTableResponse.class,
+                    headers,
+                    ErrorHandlers.tableErrorHandler()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("simulated transient 503");
+
+    // Verify request shape (method, path, headers including Idempotency-Key)
+    verifyCreatePost(ns, headers);
+
+    // Retry with same key: server should replay 200 OK
+    LoadTableResponse replay =
+        http.post(
+            ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+            req,
+            LoadTableResponse.class,
+            headers,
+            ErrorHandlers.tableErrorHandler());
+    assertThat(replay).isNotNull();
+
+    // Clean up
+    restCatalog.dropTable(ident);
+  }
+
+  @Test
+  public void testIdempotentDropDuplicateNoop() {
+    String key = "idemp-drop-void";
+    Namespace ns = Namespace.of("ns_void");
+    TableIdentifier ident = TableIdentifier.of(ns, "t_void");
+    restCatalog.createNamespace(ns, ImmutableMap.of());
+    Pair<RESTClient, Map<String, String>> httpAndHeaders = httpAndHeaders(key);
+    RESTClient http = httpAndHeaders.first();
+    Map<String, String> headers = httpAndHeaders.second();

Review Comment:
   Thanks for the comment! I have added a smaller helper method for the common 
code. 



##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -105,9 +111,140 @@ public class CatalogHandlers {
   private static final InMemoryPlanningState IN_MEMORY_PLANNING_STATE =
       InMemoryPlanningState.getInstance();
   private static final ExecutorService ASYNC_PLANNING_POOL = 
Executors.newSingleThreadExecutor();
+  // Advanced idempotency store with TTL and in-flight coalescing
+  private static final ConcurrentMap<String, IdempotencyEntry> 
IDEMPOTENCY_STORE =
+      Maps.newConcurrentMap();
+  private static volatile long idempotencyLifetimeMillis = 
TimeUnit.MINUTES.toMillis(30);
 
   private CatalogHandlers() {}
 
+  /**
+   * Execute a mutation with basic idempotency semantics based on the 
Idempotency-Key header.
+   *
+   * <p>This simple reference implementation stores the response in-memory 
keyed by the header
+   * value. If the same key is seen again, the stored response is returned and 
the action is not
+   * re-executed. This is suitable for tests and examples; production servers 
should provide a
+   * durable store and TTL management.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends RESTResponse> T withIdempotency(
+      HTTPRequest httpRequest, Supplier<T> action) {
+    return withIdempotencyInternal(httpRequest, action);
+  }
+
+  public static void withIdempotency(HTTPRequest httpRequest, Runnable action) 
{
+    withIdempotencyInternal(
+        httpRequest,
+        () -> {
+          action.run();
+          return Boolean.TRUE;
+        });
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> T withIdempotencyInternal(HTTPRequest httpRequest, 
Supplier<T> action) {
+    Optional<HTTPHeaders.HTTPHeader> keyHeader =
+        httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+    if (keyHeader.isEmpty()) {
+      return action.get();
+    }
+
+    String key = keyHeader.get().value();
+
+    // check existing entry and TTL
+    IdempotencyEntry existing = IDEMPOTENCY_STORE.get(key);

Review Comment:
    Good call — I updated this to avoid the get/remove/put race by using a 
single atomic map operation. I now use IDEMPOTENCY_STORE.compute(...) to either 
reuse the current entry or replace it if missing/expired, and use a per-call 
isLeader flag so only the thread that created the new entry executes the action 
while others wait and replay the finalized result. I used compute (instead of 
computeIfAbsent) because we also need to replace expired entries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to