LiebingYu opened a new issue, #2861:
URL: https://github.com/apache/fluss/issues/2861

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Description
   
   **Task Overview:**
   
   Fix partition expiration in the Fluss Tiering Service for Paimon lake 
tables. Currently, when a Paimon lake table is configured with 
`partition.expiration-time`, partition expiration never fires during Tiering 
Service execution, even after many tiering rounds.
   
   **Root Cause:**
   
   This is caused by a lifecycle mismatch between the Tiering Service and 
Paimon's partition expiration mechanism.
   
   In `PaimonLakeCommitter`, every tiering round creates a brand-new 
`TableCommitImpl` via `fileStoreTable.newCommit(...)`. Paimon's 
`TableCommitImpl` in turn constructs a fresh `PartitionExpire` each time, whose 
`lastCheck` is initialized to approximately `now`:
   
   ```java
   // PartitionExpire constructor
   long checkIntervalSeconds = checkInterval.toMillis() / 1000;
   if (checkIntervalSeconds > 0) {
       rndSeconds = ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
   }
   this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
   ```
   
   The `expire()` method has three trigger conditions:
   
   ```java
   List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
       if (checkInterval.isZero()                                        // (1)
               || now.isAfter(lastCheck.plus(checkInterval))             // (2)
               || (endInputCheckPartitionExpire                          // (3)
                       && Long.MAX_VALUE == commitIdentifier)) {
           // ... do expire
       }
   }
   ```
   
   Since `lastCheck` is just reset on every tiering round, condition (2) is 
**always false** within the lifetime of a single short-lived `PartitionExpire`. 
`checkInterval` is never zero by default, so condition (1) doesn't help either.
   
   In a standard **Flink Batch job**, `TableCommitImpl` is long-lived so (2) 
eventually satisfies. Additionally, Paimon provides 
`end-input.check-partition-expire`: when the batch job ends, it commits with 
`commitIdentifier = Long.MAX_VALUE` (`BatchWriteBuilder.COMMIT_IDENTIFIER`), 
unconditionally triggering condition (3).
   
   The Tiering Service has neither property:
   - `TableCommitImpl` / `PartitionExpire` is recreated on every tiering round 
— condition (2) never satisfies.
   - `commitIdentifier` is always `Long.MAX_VALUE` (same `COMMIT_IDENTIFIER`), 
but `endInputCheckPartitionExpire` defaults to `false` — condition (3) never 
fires.
   
   
   ### Willingness to contribute
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to