leekeiabstraction commented on code in PR #2808:
URL: https://github.com/apache/fluss/pull/2808#discussion_r3005238451
##########
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:
##########
@@ -745,33 +745,37 @@ public CompletedSnapshot
triggerAndWaitSnapshot(TableBucket tableBucket) {
}
private Long triggerSnapshot(TableBucket tableBucket) {
- Long snapshotId = null;
- Long nextSnapshotId = null;
for (TabletServer ts : tabletServers.values()) {
ReplicaManager.HostedReplica replica =
ts.getReplicaManager().getReplica(tableBucket);
if (replica instanceof ReplicaManager.OnlineReplica) {
Replica r = ((ReplicaManager.OnlineReplica)
replica).getReplica();
PeriodicSnapshotManager kvSnapshotManager =
r.getKvSnapshotManager();
if (r.isLeader() && kvSnapshotManager != null) {
- snapshotId = kvSnapshotManager.currentSnapshotId();
+ long snapshotId = kvSnapshotManager.currentSnapshotId();
kvSnapshotManager.triggerSnapshot();
- nextSnapshotId = kvSnapshotManager.currentSnapshotId();
- break;
+ // Poll until the snapshot ID increments, confirming the
async trigger was
+ // processed. triggerSnapshot() submits work to a
guardedExecutor
+ // asynchronously, so the counter may not have incremented
yet on return.
+ // If the ID does not increment within the timeout, the
snapshot was
+ // legitimately skipped (e.g., no new data since last
snapshot).
+ long deadline = System.currentTimeMillis() + 1_000;
+ while (kvSnapshotManager.currentSnapshotId() <=
snapshotId) {
+ if (System.currentTimeMillis() > deadline) {
+ return null;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ return snapshotId;
Review Comment:
Hello @hemanthsavasere
Reviewing this as there are recent failures pertaining this. The root cause
looks correct. And I agree with the semantic difference between returning null
and throwing exceptions.
However if we consider the callers of `triggerSnapshot(TableBucket
tableBucket)`.
1. `CompletedSnapshot triggerAndWaitSnapshot(TableBucket tableBucket)` which
you pointed out. On a null snapshot ID, it fails saying with `No new snapshot
triggered for table bucket <>`. `triggerSnapshot(TableBucket tableBucket)` can
be be made to use `retry()` with assertion on snapshotID increment +
`withFailMessage()` to match the failure message.
2. `void triggerAndWaitSnapshots(Collection<TableBucket> tableBuckets)`
currently triggers snapshots and actually drop the `waitUntilSnapshotFinished`
for null snapshot ids. I think this means that tests that call it actually may
miss actual cases where snapshots were not successfully triggered. Adding
assertion through `retry()` will harden our tests.
Let me know what your thoughts are.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]