hemanthsavasere commented on code in PR #2808:
URL: https://github.com/apache/fluss/pull/2808#discussion_r2912586004


##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -745,33 +745,37 @@ public CompletedSnapshot 
triggerAndWaitSnapshot(TableBucket tableBucket) {
     }
 
     private Long triggerSnapshot(TableBucket tableBucket) {
-        Long snapshotId = null;
-        Long nextSnapshotId = null;
         for (TabletServer ts : tabletServers.values()) {
             ReplicaManager.HostedReplica replica = 
ts.getReplicaManager().getReplica(tableBucket);
             if (replica instanceof ReplicaManager.OnlineReplica) {
                 Replica r = ((ReplicaManager.OnlineReplica) 
replica).getReplica();
                 PeriodicSnapshotManager kvSnapshotManager = 
r.getKvSnapshotManager();
                 if (r.isLeader() && kvSnapshotManager != null) {
-                    snapshotId = kvSnapshotManager.currentSnapshotId();
+                    long snapshotId = kvSnapshotManager.currentSnapshotId();
                     kvSnapshotManager.triggerSnapshot();
-                    nextSnapshotId = kvSnapshotManager.currentSnapshotId();
-                    break;
+                    // Poll until the snapshot ID increments, confirming the 
async trigger was
+                    // processed. triggerSnapshot() submits work to a 
guardedExecutor
+                    // asynchronously, so the counter may not have incremented 
yet on return.
+                    // If the ID does not increment within the timeout, the 
snapshot was
+                    // legitimately skipped (e.g., no new data since last 
snapshot).
+                    long deadline = System.currentTimeMillis() + 1_000;
+                    while (kvSnapshotManager.currentSnapshotId() <= 
snapshotId) {
+                        if (System.currentTimeMillis() > deadline) {
+                            return null;
+                        }
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return snapshotId;

Review Comment:
   Both retry() and waitUntil() throw/fail on timeout. The current while loop 
returns null on timeout, a fundamentally different semantic. The null return is 
intentional: the caller triggerAndWaitSnapshot() (line 737) checks for null and 
calls fail() only if no snapshot was triggered, distinguishing "not triggered" 
from "legitimately skipped.
   
   Also, wrapping retry or waitUntil in try-catch to swallow its failure does 
not serve the its purpose and is arguably less clear than the current while 
loop.
   
    A new utility like pollUntil() returning Optional<T> could be added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to