stevenzwu commented on code in PR #14320:
URL: https://github.com/apache/iceberg/pull/14320#discussion_r2430364026
##########
core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java:
##########
@@ -35,6 +35,14 @@ private RESTCatalogProperties() {}
public static final String PAGE_SIZE = "rest-page-size";
+ // Enable a lightweight client-side reconciliation flow for REST commits
that add a new snapshot
+ // when the server responds with CommitStateUnknown (e.g., transient 5xx).
When enabled, the
+ // client will refresh the table and verify the expected snapshot is present
in history; if so,
+ // the commit is treated as successful without re-executing.
+ public static final String RECONCILE_ON_UNKNOWN_SNAPSHOT_ADD =
+ "rest-reconcile-on-unknown-snapshot-add";
Review Comment:
nit: this name isn't very clear on its purpose. maybe
`rest-reconcile-on-commit-state-unknown`>
##########
core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java:
##########
@@ -155,8 +158,24 @@ public void commit(TableMetadata base, TableMetadata
metadata) {
// the error handler will throw necessary exceptions like
CommitFailedException and
// UnknownCommitStateException
// TODO: ensure that the HTTP client lib passes HTTP client errors to the
error handler
- LoadTableResponse response =
- client.post(path, request, LoadTableResponse.class, headers,
errorHandler);
+ LoadTableResponse response;
+ try {
+ response = client.post(path, request, LoadTableResponse.class, headers,
errorHandler);
+ } catch (CommitStateUnknownException e) {
+ // Lightweight reconciliation for snapshot-add-only updates on transient
unknown commit state
+ if (reconcileOnUnknownSnapshotAdd && updateType == UpdateType.SIMPLE) {
+ Long expectedSnapshotId = expectedSnapshotIdIfSnapshotAddOnly(updates);
+ if (expectedSnapshotId != null) {
+ // attempt to refresh and verify the expected snapshot became current
+ TableMetadata refreshed = refresh();
Review Comment:
we should probably try-catch for the reconciliation step. if it failed, we
should re-throw the original commit exception `e`.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -2900,6 +2901,114 @@ public <T extends RESTResponse> T execute(
return catalog(adapter);
}
+ @Test
+ public void testUnknownWithoutReconcileThrowsAndNoRetry() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog =
+ new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config)
-> adapter);
+ // property omitted or set to false: reconciliation disabled
+ catalog.initialize(
+ "test",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
+ RESTCatalogProperties.RECONCILE_ON_UNKNOWN_SNAPSHOT_ADD, "false"));
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(TABLE.namespace());
+ }
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ Mockito.doAnswer(
+ invocation -> {
+ invocation.callRealMethod();
+ throw new CommitStateUnknownException(
+ new ServiceFailureException("Service failed: 503"));
+ })
+ .when(adapter)
+ .execute(
+ reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ Table table = catalog.loadTable(TABLE);
+
+ assertThatThrownBy(() -> table.newFastAppend().appendFile(FILE_A).commit())
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageContaining("Cannot determine whether the commit was
successful");
+ }
+
+ @Test
+ public void testReconcileOnUnknownSnapshotAddMatchesSnapshotId() {
+ RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog =
+ new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config)
-> adapter);
+ catalog.initialize(
+ "test",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
+ RESTCatalogProperties.RECONCILE_ON_UNKNOWN_SNAPSHOT_ADD, "true"));
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(TABLE.namespace());
+ }
+
+ catalog.createTable(TABLE, SCHEMA);
+
+ // Simulate: server commits, but client receives CommitStateUnknown
(transient 5xx)
+ Mockito.doAnswer(
+ invocation -> {
+ invocation.callRealMethod();
+ throw new CommitStateUnknownException(
+ new ServiceFailureException("Service failed: 503"));
+ })
+ .when(adapter)
+ .execute(
+ reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ Table table = catalog.loadTable(TABLE);
+
+ // Perform a snapshot-adding commit; should reconcile instead of failing
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ // Extract the snapshot id we attempted to commit from the request body
+ long expectedSnapshotId =
+ allRequests(adapter).stream()
+ .filter(
+ r -> r.method() == HTTPMethod.POST &&
r.path().equals(RESOURCE_PATHS.table(TABLE)))
+ .map(HTTPRequest::body)
+ .filter(UpdateTableRequest.class::isInstance)
+ .map(UpdateTableRequest.class::cast)
+ .map(
+ req ->
+ (MetadataUpdate.AddSnapshot)
+ req.updates().stream()
+ .filter(u -> u instanceof
MetadataUpdate.AddSnapshot)
+ .findFirst()
+ .orElseThrow())
+ .map(add -> add.snapshot().snapshotId())
+ .findFirst()
+ .orElseThrow();
+
+ Table reloaded = catalog.loadTable(TABLE);
+ assertThat(reloaded.currentSnapshot()).isNotNull();
+ assertThat(reloaded.snapshot(expectedSnapshotId)).isNotNull();
+
+ // Verify the POST was not re-executed
Review Comment:
POST is not retried even if the reconciliation check is disabled. not sure
about the purpose of this check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]