luoyuxia commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r3013987550


##########
fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java:
##########
@@ -43,8 +45,9 @@ public interface LakeWriter<WriteResult> extends Closeable {
     /**
      * Completes the writing process and returns the write result.
      *
-     * @return the write result
+     * @return the write result, or null if no data was written (empty write 
scenario)

Review Comment:
   I'm curious about when it'll be null(not data written)



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java:
##########
@@ -33,7 +33,10 @@ public class TableBucketWriteResultSerializer<WriteResult>
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
             ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 2;

Review Comment:
   Acutally, we may not need to bump the version since it's only needed for 
shuffle... For code clean, we can still keep it as `CURRENT_VERSION = 1`



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -427,16 +432,76 @@ private void assignSplits() {
             }
         } else {
             // report heartbeat to fluss coordinator
-            
waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
+            heartbeatResponse =
+                    waitHeartbeatResponse(
+                            
coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
         }
 
+        // Process tiering_table_resp to detect table deletion errors
+        handleTieringTableResponseErrors(heartbeatResponse);
+
         // if come to here, we can remove 
currentFinishedTables/failedTableEpochs to avoid send
         // in next round
         currentFinishedTables.forEach(finishedTables::remove);
         currentFailedTableEpochs.forEach(failedTableEpochs::remove);
         return lakeTieringInfo;
     }
 
+    /**
+     * Handle errors in tiering_table_resp from heartbeat response. If a table 
has been dropped,
+     * mark it as failed and notify readers to skip processing.
+     */
+    private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse 
heartbeatResponse) {
+        for (PbHeartbeatRespForTable resp : 
heartbeatResponse.getTieringTableRespsList()) {
+            if (resp.hasError()) {
+                ApiError error = ApiError.fromErrorMessage(resp.getError());
+                Errors errors = error.error();
+                // Check if the error indicates table doesn't exist or tiering 
epoch is fenced
+                // (which happens when table is dropped and recreated)
+                if (errors == Errors.TABLE_NOT_EXIST
+                        || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION

Review Comment:
   not to handle `UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` and 
`FENCED_TIERING_EPOCH_EXCEPTION`



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -427,16 +432,76 @@ private void assignSplits() {
             }
         } else {
             // report heartbeat to fluss coordinator
-            
waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
+            heartbeatResponse =
+                    waitHeartbeatResponse(
+                            
coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
         }
 
+        // Process tiering_table_resp to detect table deletion errors
+        handleTieringTableResponseErrors(heartbeatResponse);
+
         // if come to here, we can remove 
currentFinishedTables/failedTableEpochs to avoid send
         // in next round
         currentFinishedTables.forEach(finishedTables::remove);
         currentFailedTableEpochs.forEach(failedTableEpochs::remove);
         return lakeTieringInfo;
     }
 
+    /**
+     * Handle errors in tiering_table_resp from heartbeat response. If a table 
has been dropped,
+     * mark it as failed and notify readers to skip processing.
+     */
+    private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse 
heartbeatResponse) {
+        for (PbHeartbeatRespForTable resp : 
heartbeatResponse.getTieringTableRespsList()) {
+            if (resp.hasError()) {
+                ApiError error = ApiError.fromErrorMessage(resp.getError());
+                Errors errors = error.error();
+                // Check if the error indicates table doesn't exist or tiering 
epoch is fenced
+                // (which happens when table is dropped and recreated)
+                if (errors == Errors.TABLE_NOT_EXIST
+                        || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION
+                        || errors == Errors.FENCED_TIERING_EPOCH_EXCEPTION) {
+                    long tableId = resp.getTableId();
+                    LOG.warn(
+                            "Table {} is dropped or epoch mismatch (error: 
{}), canceling tiering.",
+                            tableId,
+                            errors);
+                    handleTableDropped(tableId);
+                }
+            }
+        }
+    }
+
+    /**
+     * Handle a dropped table by marking all related splits to skip, removing 
from tiering epochs,
+     * and notifying readers.
+     */
+    @VisibleForTesting
+    protected void handleTableDropped(long tableId) {
+        // Remove from tiering table epochs
+        Long tieringEpoch = tieringTableEpochs.remove(tableId);

Review Comment:
   I feel like it's error-prone in here to remove from tieringTableEpochs, and 
put `failedTableEpochs`.
   Another way is consider table drop as another fail tiering event (send 
FailedTieringEvent),  so that the whole path is unified.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java:
##########
@@ -55,41 +55,61 @@ public TieringSourceFetcherManager(
     }
 
     public void markTableReachTieringMaxDuration(long tableId) {
+        LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table 
{}", tableId);
+        enqueueTaskForTable(
+                tableId,
+                reader -> {
+                    LOG.debug(
+                            "Executing handleTableReachTieringMaxDuration in 
split reader for table {}",
+                            tableId);
+                    reader.handleTableReachTieringMaxDuration(tableId);
+                },
+                "handleTableReachTieringMaxDuration");
+    }
+
+    public void markTableDropped(long tableId) {
+        LOG.info("Enqueueing handleTableDropped task for table {}", tableId);
+        enqueueTaskForTable(
+                tableId,
+                reader -> {
+                    LOG.debug("Executing handleTableDropped in split reader 
for table {}", tableId);
+                    reader.handleTableDropped(tableId);
+                },
+                "handleTableDropped");
+    }
+
+    private void enqueueTaskForTable(
+            long tableId, Consumer<TieringSplitReader<WriteResult>> action, 
String actionDesc) {
+        SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> 
splitFetcher;
         if (!fetchers.isEmpty()) {
-            // The fetcher thread is still running. This should be the 
majority of the cases.
-            LOG.info("fetchers is not empty, marking tiering max duration for 
table {}", tableId);
-            fetchers.values()
-                    .forEach(
-                            splitFetcher ->
-                                    
enqueueMarkTableReachTieringMaxDurationTask(
-                                            splitFetcher, tableId));
+            LOG.info("Fetchers are active, enqueueing {} task for table {}", 
actionDesc, tableId);
+            fetchers.values().forEach(f -> enqueueReaderTask(f, action));
         } else {
-            SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> 
splitFetcher =
-                    createSplitFetcher();
             LOG.info(
-                    "fetchers is empty, enqueue marking tiering max duration 
for table {}",
+                    "No active fetchers, creating new fetcher and enqueueing 
{} task for table {}",
+                    actionDesc,
                     tableId);
-            enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
+            splitFetcher = createSplitFetcher();
+            enqueueReaderTask(splitFetcher, action);
             startFetcher(splitFetcher);
         }
     }
 
-    private void enqueueMarkTableReachTieringMaxDurationTask(
+    @SuppressWarnings("unchecked")
+    private void enqueueReaderTask(
             SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> 
splitFetcher,
-            long reachTieringDeadlineTable) {
+            Consumer<TieringSplitReader<WriteResult>> action) {
         splitFetcher.enqueueTask(
                 new SplitFetcherTask() {
                     @Override
                     public boolean run() {
-                        ((TieringSplitReader<WriteResult>) 
splitFetcher.getSplitReader())
-                                
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
+                        action.accept(
+                                (TieringSplitReader<WriteResult>) 
splitFetcher.getSplitReader());
                         return true;
                     }
 
                     @Override
-                    public void wakeUp() {
-                        // do nothing

Review Comment:
   please still keep the comment



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java:
##########
@@ -84,6 +87,10 @@ public PaimonWriteResult complete() throws IOException {
         } catch (Exception e) {
             throw new IOException("Failed to complete Paimon write.", e);
         }
+        if (commitMessage == null) {

Review Comment:
   iceberg and lance also need to do same handle?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long 
tableId) {
         }
     }
 
+    /**
+     * Handle a table being dropped. This will mark the table as dropped, and 
it will be force
+     * completed with empty results in the next fetch cycle.
+     *
+     * <p>For the currently active table, the dropped flag is set so that 
{@link #fetch()} detects
+     * it at the start of the next cycle and calls {@link 
#forceCompleteDroppedTable()}. For tables
+     * in {@code pendingTieringSplits}, the flag is also set here; those 
splits will be skipped when
+     * they become the active table and the dropped flag is detected.
+     *
+     * @param tableId the id of the dropped table
+     */
+    public void handleTableDropped(long tableId) {
+        LOG.info(
+                "handleTableDropped, tableId: {}, currentTableId: {}, 
pendingTieringSplits: {}",
+                tableId,
+                currentTableId,
+                pendingTieringSplits);
+        if ((currentTableId != null && currentTableId.equals(tableId))
+                || pendingTieringSplits.containsKey(tableId)) {
+            // Current table is being dropped, mark it for force completion in 
next fetch
+            LOG.info("Table {} is dropped, will force to complete with empty 
results.", tableId);
+            droppedTables.add(tableId);
+        }
+    }
+
+    /**
+     * Force complete tiering for a dropped table. This will close any 
in-progress lake writers
+     * without completing (discarding uncommitted data), then finish all 
remaining splits with null
+     * write results.
+     */
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> 
forceCompleteDroppedTable()
+            throws IOException {
+        LOG.info("Force completing dropped table {}", currentTableId);
+
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // Generate empty results for all splits (both log and snapshot)
+        Iterator<Map.Entry<TableBucket, TieringSplit>> splitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (splitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = splitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null) {
+                // Close lake writer without complete - discard data for 
dropped table
+                LakeWriter<WriteResult> lakeWriter = 
lakeWriters.remove(bucket);
+                if (lakeWriter != null) {
+                    try {
+                        lakeWriter.close();
+                    } catch (Exception e) {
+                        LOG.warn("Failed to close lake writer for bucket {}", 
bucket, e);
+                    }
+                }
+
+                TableBucketWriteResult<WriteResult> bucketResult =
+                        toTableBucketWriteResult(
+                                split.getTablePath(),
+                                bucket,
+                                split.getPartitionName(),
+                                null,
+                                UNKNOWN_BUCKET_OFFSET,
+                                UNKNOWN_BUCKET_TIMESTAMP,
+                                checkNotNull(currentTableNumberOfSplits),
+                                true);
+                writeResults.put(bucket, bucketResult);
+                finishedSplitIds.put(bucket, split.splitId());
+                LOG.info(
+                        "Split {} is forced to be finished due to table 
dropped with empty result.",
+                        split.splitId());
+                splitsIterator.remove();
+            }
+        }
+
+        // Close any remaining lake writers that don't have corresponding 
splits

Review Comment:
   do we really need it?
   When it happen that we have lake writer but no corresponding splits?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long 
tableId) {
         }
     }
 
+    /**
+     * Handle a table being dropped. This will mark the table as dropped, and 
it will be force
+     * completed with empty results in the next fetch cycle.
+     *
+     * <p>For the currently active table, the dropped flag is set so that 
{@link #fetch()} detects
+     * it at the start of the next cycle and calls {@link 
#forceCompleteDroppedTable()}. For tables
+     * in {@code pendingTieringSplits}, the flag is also set here; those 
splits will be skipped when
+     * they become the active table and the dropped flag is detected.
+     *
+     * @param tableId the id of the dropped table
+     */
+    public void handleTableDropped(long tableId) {
+        LOG.info(
+                "handleTableDropped, tableId: {}, currentTableId: {}, 
pendingTieringSplits: {}",
+                tableId,
+                currentTableId,
+                pendingTieringSplits);
+        if ((currentTableId != null && currentTableId.equals(tableId))
+                || pendingTieringSplits.containsKey(tableId)) {
+            // Current table is being dropped, mark it for force completion in 
next fetch
+            LOG.info("Table {} is dropped, will force to complete with empty 
results.", tableId);
+            droppedTables.add(tableId);
+        }
+    }
+
+    /**
+     * Force complete tiering for a dropped table. This will close any 
in-progress lake writers
+     * without completing (discarding uncommitted data), then finish all 
remaining splits with null
+     * write results.
+     */
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> 
forceCompleteDroppedTable()
+            throws IOException {
+        LOG.info("Force completing dropped table {}", currentTableId);
+
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // Generate empty results for all splits (both log and snapshot)
+        Iterator<Map.Entry<TableBucket, TieringSplit>> splitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (splitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = splitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null) {
+                // Close lake writer without complete - discard data for 
dropped table
+                LakeWriter<WriteResult> lakeWriter = 
lakeWriters.remove(bucket);
+                if (lakeWriter != null) {
+                    try {
+                        lakeWriter.close();
+                    } catch (Exception e) {
+                        LOG.warn("Failed to close lake writer for bucket {}", 
bucket, e);
+                    }
+                }
+
+                TableBucketWriteResult<WriteResult> bucketResult =
+                        toTableBucketWriteResult(
+                                split.getTablePath(),
+                                bucket,
+                                split.getPartitionName(),
+                                null,
+                                UNKNOWN_BUCKET_OFFSET,
+                                UNKNOWN_BUCKET_TIMESTAMP,
+                                checkNotNull(currentTableNumberOfSplits),
+                                true);
+                writeResults.put(bucket, bucketResult);
+                finishedSplitIds.put(bucket, split.splitId());
+                LOG.info(
+                        "Split {} is forced to be finished due to table 
dropped with empty result.",
+                        split.splitId());
+                splitsIterator.remove();
+            }
+        }
+
+        // Close any remaining lake writers that don't have corresponding 
splits
+        for (Map.Entry<TableBucket, LakeWriter<WriteResult>> entry : 
lakeWriters.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close orphan lake writer for bucket {}", 
entry.getKey(), e);
+            }
+        }
+        lakeWriters.clear();
+
+        // Also handle pending snapshot splits for this table

Review Comment:
   we don't need to handle pending snapshot splits again since it must be in 
`currentTableSplitsByBucket`. See method 
   ```
   private void addSplitToCurrentTable(TieringSplit split) {
           this.currentTableSplitsByBucket.put(split.getTableBucket(), split);
           if (split.isTieringSnapshotSplit()) {
               this.currentPendingSnapshotSplits.add((TieringSnapshotSplit) 
split);
           } else if (split.isTieringLogSplit()) {
               subscribeLog((TieringLogSplit) split);
           }
       }
   ```
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -251,6 +260,25 @@ private void checkSplitOrStartNext() {
         }
 
         Set<TieringSplit> pendingSplits = 
pendingTieringSplits.remove(pendingTableId);
+
+        // If the pending table is already dropped, set minimal table state 
from split metadata
+        // without calling getOrMoveToTable() to avoid RPC exception 
(TableNotExistException).
+        // The next fetch() cycle will detect the dropped flag and call 
forceCompleteDroppedTable().
+        if (droppedTables.contains(pendingTableId)) {
+            TieringSplit firstSplit = pendingSplits.iterator().next();
+            currentTableId = pendingTableId;
+            currentTablePath = firstSplit.getTablePath();
+            currentTableNumberOfSplits = firstSplit.getNumberOfSplits();
+            for (TieringSplit split : pendingSplits) {
+                currentTableSplitsByBucket.put(split.getTableBucket(), split);
+            }
+            LOG.info(
+                    "Skipping RPC for dropped table {} (path: {}), will force 
complete in next fetch cycle.",

Review Comment:
   nit:
   
   ```suggestion
                       Table {} is already marked dropped; skip opening table 
{} and force complete in next fetch cycle.
   ```
   `Skipping RPC` looks strange to me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to