luoyuxia commented on code in PR #2920:
URL: https://github.com/apache/fluss/pull/2920#discussion_r3013987550
##########
fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java:
##########
@@ -43,8 +45,9 @@ public interface LakeWriter<WriteResult> extends Closeable {
/**
* Completes the writing process and returns the write result.
*
- * @return the write result
+ * @return the write result, or null if no data was written (empty write
scenario)
Review Comment:
I'm curious about when it'll be null(not data written)
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java:
##########
@@ -33,7 +33,10 @@ public class TableBucketWriteResultSerializer<WriteResult>
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
- private static final int CURRENT_VERSION = 1;
+ private static final int CURRENT_VERSION = 2;
Review Comment:
Acutally, we may not need to bump the version since it's only needed for
shuffle... For code clean, we can still keep it as `CURRENT_VERSION = 1`
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -427,16 +432,76 @@ private void assignSplits() {
}
} else {
// report heartbeat to fluss coordinator
-
waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
+ heartbeatResponse =
+ waitHeartbeatResponse(
+
coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
}
+ // Process tiering_table_resp to detect table deletion errors
+ handleTieringTableResponseErrors(heartbeatResponse);
+
// if come to here, we can remove
currentFinishedTables/failedTableEpochs to avoid send
// in next round
currentFinishedTables.forEach(finishedTables::remove);
currentFailedTableEpochs.forEach(failedTableEpochs::remove);
return lakeTieringInfo;
}
+ /**
+ * Handle errors in tiering_table_resp from heartbeat response. If a table
has been dropped,
+ * mark it as failed and notify readers to skip processing.
+ */
+ private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse
heartbeatResponse) {
+ for (PbHeartbeatRespForTable resp :
heartbeatResponse.getTieringTableRespsList()) {
+ if (resp.hasError()) {
+ ApiError error = ApiError.fromErrorMessage(resp.getError());
+ Errors errors = error.error();
+ // Check if the error indicates table doesn't exist or tiering
epoch is fenced
+ // (which happens when table is dropped and recreated)
+ if (errors == Errors.TABLE_NOT_EXIST
+ || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION
Review Comment:
not to handle `UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` and
`FENCED_TIERING_EPOCH_EXCEPTION`
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -427,16 +432,76 @@ private void assignSplits() {
}
} else {
// report heartbeat to fluss coordinator
-
waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
+ heartbeatResponse =
+ waitHeartbeatResponse(
+
coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
}
+ // Process tiering_table_resp to detect table deletion errors
+ handleTieringTableResponseErrors(heartbeatResponse);
+
// if come to here, we can remove
currentFinishedTables/failedTableEpochs to avoid send
// in next round
currentFinishedTables.forEach(finishedTables::remove);
currentFailedTableEpochs.forEach(failedTableEpochs::remove);
return lakeTieringInfo;
}
+ /**
+ * Handle errors in tiering_table_resp from heartbeat response. If a table
has been dropped,
+ * mark it as failed and notify readers to skip processing.
+ */
+ private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse
heartbeatResponse) {
+ for (PbHeartbeatRespForTable resp :
heartbeatResponse.getTieringTableRespsList()) {
+ if (resp.hasError()) {
+ ApiError error = ApiError.fromErrorMessage(resp.getError());
+ Errors errors = error.error();
+ // Check if the error indicates table doesn't exist or tiering
epoch is fenced
+ // (which happens when table is dropped and recreated)
+ if (errors == Errors.TABLE_NOT_EXIST
+ || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION
+ || errors == Errors.FENCED_TIERING_EPOCH_EXCEPTION) {
+ long tableId = resp.getTableId();
+ LOG.warn(
+ "Table {} is dropped or epoch mismatch (error:
{}), canceling tiering.",
+ tableId,
+ errors);
+ handleTableDropped(tableId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Handle a dropped table by marking all related splits to skip, removing
from tiering epochs,
+ * and notifying readers.
+ */
+ @VisibleForTesting
+ protected void handleTableDropped(long tableId) {
+ // Remove from tiering table epochs
+ Long tieringEpoch = tieringTableEpochs.remove(tableId);
Review Comment:
I feel like it's error-prone in here to remove from tieringTableEpochs, and
put `failedTableEpochs`.
Another way is consider table drop as another fail tiering event (send
FailedTieringEvent), so that the whole path is unified.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java:
##########
@@ -55,41 +55,61 @@ public TieringSourceFetcherManager(
}
public void markTableReachTieringMaxDuration(long tableId) {
+ LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table
{}", tableId);
+ enqueueTaskForTable(
+ tableId,
+ reader -> {
+ LOG.debug(
+ "Executing handleTableReachTieringMaxDuration in
split reader for table {}",
+ tableId);
+ reader.handleTableReachTieringMaxDuration(tableId);
+ },
+ "handleTableReachTieringMaxDuration");
+ }
+
+ public void markTableDropped(long tableId) {
+ LOG.info("Enqueueing handleTableDropped task for table {}", tableId);
+ enqueueTaskForTable(
+ tableId,
+ reader -> {
+ LOG.debug("Executing handleTableDropped in split reader
for table {}", tableId);
+ reader.handleTableDropped(tableId);
+ },
+ "handleTableDropped");
+ }
+
+ private void enqueueTaskForTable(
+ long tableId, Consumer<TieringSplitReader<WriteResult>> action,
String actionDesc) {
+ SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit>
splitFetcher;
if (!fetchers.isEmpty()) {
- // The fetcher thread is still running. This should be the
majority of the cases.
- LOG.info("fetchers is not empty, marking tiering max duration for
table {}", tableId);
- fetchers.values()
- .forEach(
- splitFetcher ->
-
enqueueMarkTableReachTieringMaxDurationTask(
- splitFetcher, tableId));
+ LOG.info("Fetchers are active, enqueueing {} task for table {}",
actionDesc, tableId);
+ fetchers.values().forEach(f -> enqueueReaderTask(f, action));
} else {
- SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit>
splitFetcher =
- createSplitFetcher();
LOG.info(
- "fetchers is empty, enqueue marking tiering max duration
for table {}",
+ "No active fetchers, creating new fetcher and enqueueing
{} task for table {}",
+ actionDesc,
tableId);
- enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
+ splitFetcher = createSplitFetcher();
+ enqueueReaderTask(splitFetcher, action);
startFetcher(splitFetcher);
}
}
- private void enqueueMarkTableReachTieringMaxDurationTask(
+ @SuppressWarnings("unchecked")
+ private void enqueueReaderTask(
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit>
splitFetcher,
- long reachTieringDeadlineTable) {
+ Consumer<TieringSplitReader<WriteResult>> action) {
splitFetcher.enqueueTask(
new SplitFetcherTask() {
@Override
public boolean run() {
- ((TieringSplitReader<WriteResult>)
splitFetcher.getSplitReader())
-
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
+ action.accept(
+ (TieringSplitReader<WriteResult>)
splitFetcher.getSplitReader());
return true;
}
@Override
- public void wakeUp() {
- // do nothing
Review Comment:
please still keep the comment
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java:
##########
@@ -84,6 +87,10 @@ public PaimonWriteResult complete() throws IOException {
} catch (Exception e) {
throw new IOException("Failed to complete Paimon write.", e);
}
+ if (commitMessage == null) {
Review Comment:
iceberg and lance also need to do same handle?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long
tableId) {
}
}
+ /**
+ * Handle a table being dropped. This will mark the table as dropped, and
it will be force
+ * completed with empty results in the next fetch cycle.
+ *
+ * <p>For the currently active table, the dropped flag is set so that
{@link #fetch()} detects
+ * it at the start of the next cycle and calls {@link
#forceCompleteDroppedTable()}. For tables
+ * in {@code pendingTieringSplits}, the flag is also set here; those
splits will be skipped when
+ * they become the active table and the dropped flag is detected.
+ *
+ * @param tableId the id of the dropped table
+ */
+ public void handleTableDropped(long tableId) {
+ LOG.info(
+ "handleTableDropped, tableId: {}, currentTableId: {},
pendingTieringSplits: {}",
+ tableId,
+ currentTableId,
+ pendingTieringSplits);
+ if ((currentTableId != null && currentTableId.equals(tableId))
+ || pendingTieringSplits.containsKey(tableId)) {
+ // Current table is being dropped, mark it for force completion in
next fetch
+ LOG.info("Table {} is dropped, will force to complete with empty
results.", tableId);
+ droppedTables.add(tableId);
+ }
+ }
+
+ /**
+ * Force complete tiering for a dropped table. This will close any
in-progress lake writers
+ * without completing (discarding uncommitted data), then finish all
remaining splits with null
+ * write results.
+ */
+ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
forceCompleteDroppedTable()
+ throws IOException {
+ LOG.info("Force completing dropped table {}", currentTableId);
+
+ Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults =
new HashMap<>();
+ Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+ // Generate empty results for all splits (both log and snapshot)
+ Iterator<Map.Entry<TableBucket, TieringSplit>> splitsIterator =
+ currentTableSplitsByBucket.entrySet().iterator();
+ while (splitsIterator.hasNext()) {
+ Map.Entry<TableBucket, TieringSplit> entry = splitsIterator.next();
+ TableBucket bucket = entry.getKey();
+ TieringSplit split = entry.getValue();
+ if (split != null) {
+ // Close lake writer without complete - discard data for
dropped table
+ LakeWriter<WriteResult> lakeWriter =
lakeWriters.remove(bucket);
+ if (lakeWriter != null) {
+ try {
+ lakeWriter.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close lake writer for bucket {}",
bucket, e);
+ }
+ }
+
+ TableBucketWriteResult<WriteResult> bucketResult =
+ toTableBucketWriteResult(
+ split.getTablePath(),
+ bucket,
+ split.getPartitionName(),
+ null,
+ UNKNOWN_BUCKET_OFFSET,
+ UNKNOWN_BUCKET_TIMESTAMP,
+ checkNotNull(currentTableNumberOfSplits),
+ true);
+ writeResults.put(bucket, bucketResult);
+ finishedSplitIds.put(bucket, split.splitId());
+ LOG.info(
+ "Split {} is forced to be finished due to table
dropped with empty result.",
+ split.splitId());
+ splitsIterator.remove();
+ }
+ }
+
+ // Close any remaining lake writers that don't have corresponding
splits
Review Comment:
do we really need it?
When it happen that we have lake writer but no corresponding splits?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long
tableId) {
}
}
+ /**
+ * Handle a table being dropped. This will mark the table as dropped, and
it will be force
+ * completed with empty results in the next fetch cycle.
+ *
+ * <p>For the currently active table, the dropped flag is set so that
{@link #fetch()} detects
+ * it at the start of the next cycle and calls {@link
#forceCompleteDroppedTable()}. For tables
+ * in {@code pendingTieringSplits}, the flag is also set here; those
splits will be skipped when
+ * they become the active table and the dropped flag is detected.
+ *
+ * @param tableId the id of the dropped table
+ */
+ public void handleTableDropped(long tableId) {
+ LOG.info(
+ "handleTableDropped, tableId: {}, currentTableId: {},
pendingTieringSplits: {}",
+ tableId,
+ currentTableId,
+ pendingTieringSplits);
+ if ((currentTableId != null && currentTableId.equals(tableId))
+ || pendingTieringSplits.containsKey(tableId)) {
+ // Current table is being dropped, mark it for force completion in
next fetch
+ LOG.info("Table {} is dropped, will force to complete with empty
results.", tableId);
+ droppedTables.add(tableId);
+ }
+ }
+
+ /**
+ * Force complete tiering for a dropped table. This will close any
in-progress lake writers
+ * without completing (discarding uncommitted data), then finish all
remaining splits with null
+ * write results.
+ */
+ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
forceCompleteDroppedTable()
+ throws IOException {
+ LOG.info("Force completing dropped table {}", currentTableId);
+
+ Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults =
new HashMap<>();
+ Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+ // Generate empty results for all splits (both log and snapshot)
+ Iterator<Map.Entry<TableBucket, TieringSplit>> splitsIterator =
+ currentTableSplitsByBucket.entrySet().iterator();
+ while (splitsIterator.hasNext()) {
+ Map.Entry<TableBucket, TieringSplit> entry = splitsIterator.next();
+ TableBucket bucket = entry.getKey();
+ TieringSplit split = entry.getValue();
+ if (split != null) {
+ // Close lake writer without complete - discard data for
dropped table
+ LakeWriter<WriteResult> lakeWriter =
lakeWriters.remove(bucket);
+ if (lakeWriter != null) {
+ try {
+ lakeWriter.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close lake writer for bucket {}",
bucket, e);
+ }
+ }
+
+ TableBucketWriteResult<WriteResult> bucketResult =
+ toTableBucketWriteResult(
+ split.getTablePath(),
+ bucket,
+ split.getPartitionName(),
+ null,
+ UNKNOWN_BUCKET_OFFSET,
+ UNKNOWN_BUCKET_TIMESTAMP,
+ checkNotNull(currentTableNumberOfSplits),
+ true);
+ writeResults.put(bucket, bucketResult);
+ finishedSplitIds.put(bucket, split.splitId());
+ LOG.info(
+ "Split {} is forced to be finished due to table
dropped with empty result.",
+ split.splitId());
+ splitsIterator.remove();
+ }
+ }
+
+ // Close any remaining lake writers that don't have corresponding
splits
+ for (Map.Entry<TableBucket, LakeWriter<WriteResult>> entry :
lakeWriters.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close orphan lake writer for bucket {}",
entry.getKey(), e);
+ }
+ }
+ lakeWriters.clear();
+
+ // Also handle pending snapshot splits for this table
Review Comment:
we don't need to handle pending snapshot splits again since it must be in
`currentTableSplitsByBucket`. See method
```
private void addSplitToCurrentTable(TieringSplit split) {
this.currentTableSplitsByBucket.put(split.getTableBucket(), split);
if (split.isTieringSnapshotSplit()) {
this.currentPendingSnapshotSplits.add((TieringSnapshotSplit)
split);
} else if (split.isTieringLogSplit()) {
subscribeLog((TieringLogSplit) split);
}
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -251,6 +260,25 @@ private void checkSplitOrStartNext() {
}
Set<TieringSplit> pendingSplits =
pendingTieringSplits.remove(pendingTableId);
+
+ // If the pending table is already dropped, set minimal table state
from split metadata
+ // without calling getOrMoveToTable() to avoid RPC exception
(TableNotExistException).
+ // The next fetch() cycle will detect the dropped flag and call
forceCompleteDroppedTable().
+ if (droppedTables.contains(pendingTableId)) {
+ TieringSplit firstSplit = pendingSplits.iterator().next();
+ currentTableId = pendingTableId;
+ currentTablePath = firstSplit.getTablePath();
+ currentTableNumberOfSplits = firstSplit.getNumberOfSplits();
+ for (TieringSplit split : pendingSplits) {
+ currentTableSplitsByBucket.put(split.getTableBucket(), split);
+ }
+ LOG.info(
+ "Skipping RPC for dropped table {} (path: {}), will force
complete in next fetch cycle.",
Review Comment:
nit:
```suggestion
Table {} is already marked dropped; skip opening table
{} and force complete in next fetch cycle.
```
`Skipping RPC` looks strange to me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]