amogh-jahagirdar commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2743679465
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -4394,6 +4402,102 @@ private void expectNotModifiedResponseForLoadTable(
any());
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Creates 5 different branches on the table, then performs 10 parallel
append commits on each
+ * branch at the same time (50 total concurrent operations). This verifies
that: 1. Sequence
+ * number conflicts are caught by AssertLastSequenceNumber requirement 2.
Conflicts result in
+ * CommitFailedException (retryable) not ValidationException (non-retryable)
3. The REST catalog
+ * properly handles concurrent modifications across different branches
+ */
+ @Test
+ public void testConcurrentAppendsOnMultipleBranches() {
+ int numBranches = 5;
+ int commitsPerBranch = 10;
+ int totalConcurrentWrites = numBranches * commitsPerBranch;
+
+ RESTCatalog restCatalog = catalog();
+
+ Namespace ns = Namespace.of("concurrent_test");
+ TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
+
+ restCatalog.createNamespace(ns);
+ Table table = restCatalog.buildTable(tableIdent,
SCHEMA).withPartitionSpec(SPEC).create();
+
+ // Add initial data to the main branch
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ // Create 5 branches from the main branch
+ String[] branchNames = new String[numBranches];
+ for (int i = 0; i < numBranches; i++) {
+ branchNames[i] = "branch-" + i;
+ table.manageSnapshots().createBranch(branchNames[i]).commit();
+ }
+
+ // Refresh to get updated metadata with all branches
+ restCatalog.loadTable(tableIdent);
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger validationFailureCount = new AtomicInteger(0);
+
+ ExecutorService executor =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor)
Executors.newFixedThreadPool(totalConcurrentWrites));
+
+ Tasks.range(totalConcurrentWrites)
+ .executeWith(executor)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (taskIndex, exception) -> {
+ // Check if sequence number validation error (indicates fix not
working)
+ if (exception instanceof BadRequestException
+ && exception.getMessage().contains("Cannot add snapshot with
sequence number")) {
+ validationFailureCount.incrementAndGet();
+ } else if (exception instanceof ValidationException) {
+ validationFailureCount.incrementAndGet();
+ }
+ // CommitFailedException is expected - this is the correct
retryable behavior
+ })
+ .run(
+ taskIndex -> {
+ int branchIdx = taskIndex / commitsPerBranch;
+ int commitIdx = taskIndex % commitsPerBranch;
+ String branchName = branchNames[branchIdx];
+
+ // Each thread loads the table independently
+ Table localTable = restCatalog.loadTable(tableIdent);
+
+ // Create a unique file for this commit
+ DataFile newFile =
+ DataFiles.builder(SPEC)
+ .withPath(
+ String.format(
+ "/path/to/branch-%d-commit-%d.parquet",
branchIdx, commitIdx))
+ .withFileSizeInBytes(15)
+ .withPartitionPath(String.format("id_bucket=%d",
branchIdx % 16))
+ .withRecordCount(3)
+ .build();
+
+ // Append to the specific branch
+
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
+
+ successCount.incrementAndGet();
+ });
+
+ // Verify the fix: with AssertLastSequenceNumber, there should be NO
validation failures
+ // All concurrent conflicts should be caught as CommitFailedException
(retryable)
+ assertThat(validationFailureCount.get())
+ .as(
+ "With the fix, sequence number conflicts should be caught by
AssertLastSequenceNumber "
+ + "and throw CommitFailedException (retryable), not
ValidationException")
+ .isEqualTo(0);
+
+ // At least some should succeed (commits that don't conflict or succeed
after retry)
+ assertThat(successCount.get()).as("At least some appends should
succeed").isGreaterThan(0);
Review Comment:
I think we should aim to make this test have harder assertions. I think we
could use an AtomicInteger barrier and essentially synchronize different rounds
of commits and deterministically cause conflicts. At the end, I think we'd be
able to have a deterministic number of failures (i'd probably organize it so
the barrier causes 1 conflict per branch per round?). Checkout
https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java#L130
for another example of this pattern
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]