leekeiabstraction commented on code in PR #2808:
URL: https://github.com/apache/fluss/pull/2808#discussion_r3005238451


##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -745,33 +745,37 @@ public CompletedSnapshot 
triggerAndWaitSnapshot(TableBucket tableBucket) {
     }
 
     private Long triggerSnapshot(TableBucket tableBucket) {
-        Long snapshotId = null;
-        Long nextSnapshotId = null;
         for (TabletServer ts : tabletServers.values()) {
             ReplicaManager.HostedReplica replica = 
ts.getReplicaManager().getReplica(tableBucket);
             if (replica instanceof ReplicaManager.OnlineReplica) {
                 Replica r = ((ReplicaManager.OnlineReplica) 
replica).getReplica();
                 PeriodicSnapshotManager kvSnapshotManager = 
r.getKvSnapshotManager();
                 if (r.isLeader() && kvSnapshotManager != null) {
-                    snapshotId = kvSnapshotManager.currentSnapshotId();
+                    long snapshotId = kvSnapshotManager.currentSnapshotId();
                     kvSnapshotManager.triggerSnapshot();
-                    nextSnapshotId = kvSnapshotManager.currentSnapshotId();
-                    break;
+                    // Poll until the snapshot ID increments, confirming the 
async trigger was
+                    // processed. triggerSnapshot() submits work to a 
guardedExecutor
+                    // asynchronously, so the counter may not have incremented 
yet on return.
+                    // If the ID does not increment within the timeout, the 
snapshot was
+                    // legitimately skipped (e.g., no new data since last 
snapshot).
+                    long deadline = System.currentTimeMillis() + 1_000;
+                    while (kvSnapshotManager.currentSnapshotId() <= 
snapshotId) {
+                        if (System.currentTimeMillis() > deadline) {
+                            return null;
+                        }
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return snapshotId;

Review Comment:
   Hello @hemanthsavasere 
   
   Reviewing this as there are recent failures pertaining this. The root cause 
looks correct. And I agree with the semantic difference between returning null 
and throwing exceptions. 
   
   However if we consider the callers of `triggerSnapshot(TableBucket 
tableBucket)`.
   
   1. `CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket)` which 
you pointed out. On a null snapshot ID, it fails saying with `No new snapshot 
triggered for table bucket <>`. `triggerSnapshot(TableBucket tableBucket)` can 
be be made to use `retry()` with assertion on snapshotID increment + 
`withFailMessage()` to match the failure message.
   2. `void triggerAndWaitSnapshots(Collection<TableBucket> tableBuckets)` 
currently triggers snapshots and actually drop the `waitUntilSnapshotFinished` 
for null snapshot ids. I think this means that tests that call it actually may 
miss actual cases where snapshots were not successfully triggered. Adding 
assertion through `retry()` will harden our tests.
   
   Let me know what your thoughts are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to