luoyuxia commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r2992184465
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -143,6 +146,13 @@ public
RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
currentEmptySplits.clear();
return records;
}
+
+ // Check droppedTables BEFORE checkSplitOrStartNext to quickly respond
to already-marked
+ // current table
+ if (currentTableId != null && droppedTables.contains(currentTableId)) {
+ return forceCompleteDroppedTable();
+ }
+
checkSplitOrStartNext();
Review Comment:
if move to a split, and then the split is for the table dropped, will
`getOrMoveToTable` cause exception and then fail the job. Maybe we can protect
this case .
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -164,7 +174,26 @@ public
RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
if (reachTieringMaxDurationTables.contains(currentTableId)) {
return forceCompleteTieringLogRecords();
}
- ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
+ ScanRecords scanRecords;
+ try {
+ scanRecords = currentLogScanner.poll(pollTimeout);
+ } catch (TableNotExistException e) {
Review Comment:
IIUC, we don't need to check this exception, just wait it to be forced
complete in next fetch.
Also, as disscuss before, poll won't throw `TableNotExistException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]