LiebingYu opened a new issue, #2891: URL: https://github.com/apache/fluss/issues/2891
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Description Currently, when the Coordinator restarts, it calls `resumeDeletions()` synchronously inside `TableManager.startup()`. This causes all pending table/partition deletions accumulated from previous runs to be triggered at once, flooding the `CoordinatorEventManager` queue with a large number of `DeleteReplicaResponseReceivedEvent` events. This blocks the processing of other critical coordinator events such as leader election and ISR adjustments, posing a significant risk during rolling upgrades. **Root Cause:** The deletion workflow in Fluss is a two-phase process: 1. **Phase 1** (synchronous): Drop the logical metadata node from ZooKeeper (e.g., `/tables/{tablePath}` for a table, or the partition metadata node for a partition) via `MetadataManager.dropTable()` / `dropPartition()`. 2. **Phase 2** (asynchronous): Send `StopReplica(deleteLocal=true, deleteRemote=true)` to all `TabletServer`s, and only after all responses succeed, delete the assignment node (e.g., `/tabletservers/tables/{tableId}`) via `MetadataManager.completeDeleteTable()` / `completeDeletePartition()`. The "in-progress" deletion state (`ReplicaDeletionStarted`) lives **only in memory** inside the `CoordinatorContext`. If the Coordinator or a `TabletServer` restarts between Phase 1 and Phase 2, the assignment ZooKeeper node is still present. Upon the next Coordinator startup, `loadTableAssignment()` and `loadPartitionAssignment()` detect these "orphaned" assignment nodes (whose logical metadata has already been removed) and re-enqueue them for deletion via `queueTableDeletion()` / `queuePartitionDeletion()`. Since the in-memory `ReplicaState` has been reset, all of these deletions are immediately eligible and are triggered together in a single `resumeDeletions()` call during startup. This problem is particularly severe for partitioned tables with auto-partition enabled, where a large number of expired partitions may have been deleted (Phase 1 completed) but their assignment nodes are still present in ZooKeeper. **Example Flow:** ``` Coordinator startup └── TableManager.startup() ├── replicaStateMachine.startup() // in-memory state is brand new └── resumeDeletions() // all N pending deletions triggered at once └── onDeleteTable() / onDeletePartition() for each pending item └── replicaStateMachine.handleStateChanges(..., ReplicaDeletionStarted) └── StopReplica(delete=true) sent to TabletServers └── TabletServer responds └── N * DeleteReplicaResponseReceivedEvent enqueued └── CoordinatorEventThread blocked ``` **Proposed Solution:** Replace the direct call to `resumeDeletions()` in `TableManager.startup()` with a scheduled, periodic mechanism that dispatches a `ResumeDeletionEvent` into the `CoordinatorEventManager`. This ensures: 1. **Deletion is decoupled from startup**: The Coordinator becomes available immediately after startup without being blocked by bulk deletions. 2. **Thread safety is preserved**: The deletion logic (`resumeDeletions()`) continues to run exclusively on the `CoordinatorEventThread`, avoiding any concurrency issues with `CoordinatorContext` and `CoordinatorRequestBatch`. 3. **Interleaving with normal events**: Since deletion events are dispatched periodically and processed in the same queue as other coordinator events, they will naturally interleave with leader election, ISR adjustments, and other critical operations. ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
