Copilot commented on code in PR #2815:
URL: https://github.com/apache/fluss/pull/2815#discussion_r2904264015


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -135,21 +138,25 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     // table_id -> delayed tiering task
     private final Map<Long, DelayedTiering> delayedTieringByTableId;
 
-    private final Lock lock = new ReentrantLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();

Review Comment:
   The gauges acquire a read lock on every metrics scrape. With a non-fair 
`ReentrantReadWriteLock`, frequent metric polling can delay or starve write 
operations (state transitions) under contention. A more robust approach is to 
maintain lock-free counters (e.g., `AtomicInteger` / `LongAdder`) updated 
inside the existing write-locked transitions, and have gauges read those 
counters without taking locks; alternatively, construct the RW lock as fair 
(`new ReentrantReadWriteLock(true)`) to reduce starvation risk.
   ```suggestion
       private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -165,10 +172,21 @@ protected LakeTableTieringManager(
         this.tableTierEpoch = new HashMap<>();
         this.tableLastTieredTime = new HashMap<>();
         this.delayedTieringByTableId = new HashMap<>();
+        this.tieringMetricGroup = lakeTieringMetricGroup;
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT,
+                () -> inReadLock(lock, pendingTieringTables::size));
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT,
+                () -> inReadLock(lock, liveTieringTableIds::size));
     }

Review Comment:
   The gauges acquire a read lock on every metrics scrape. With a non-fair 
`ReentrantReadWriteLock`, frequent metric polling can delay or starve write 
operations (state transitions) under contention. A more robust approach is to 
maintain lock-free counters (e.g., `AtomicInteger` / `LongAdder`) updated 
inside the existing write-locked transitions, and have gauges read those 
counters without taking locks; alternatively, construct the RW lock as fair 
(`new ReentrantReadWriteLock(true)`) to reduce starvation risk.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -313,11 +331,11 @@ protected void checkTieringServiceTimeout() {
 
     @Nullable
     public LakeTieringTableInfo requestTable() {
-        return inLock(
+        return inWriteLock(
                 lock,
                 () -> {
                     Long tableId = pendingTieringTables.poll();
-                    // no any pending table, return directly
+                    // now no any pending table, return directly

Review Comment:
   The comment is grammatically incorrect and a bit hard to read. Consider 
changing it to something like: 'No pending tables; return directly.'
   ```suggestion
                       // No pending tables; return directly.
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java:
##########
@@ -238,6 +240,44 @@ void testTieringFail() {
         assertRequestTable(tableId1, tablePath1, 2);
     }
 
+    @Test
+    void testGlobalMetrics() throws Exception {
+        // Initially no tables - verify counts are 0
+        assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0);
+        assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+
+        // Add a table
+        long tableId1 = 1L;
+        TablePath tablePath1 = TablePath.of("db", "table1");
+        TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, 
Duration.ofSeconds(10));
+        tableTieringManager.addNewLakeTable(tableInfo1);
+
+        // Advance time to make it pending - need to wait for timer to trigger
+        manualClock.advanceTime(Duration.ofSeconds(10));
+
+        // Wait for the delayed task to execute and move to pending
+        waitValue(
+                () ->
+                        tableTieringManager.getPendingTablesCount() == 1
+                                ? Optional.of(1)
+                                : Optional.empty(),
+                Duration.ofSeconds(5),

Review Comment:
   This test waits up to 5 seconds of wall-clock time for an async timer task, 
which can introduce flakiness/slowness in CI. If the test setup uses a 
controllable scheduler/timer (e.g., a manually triggered executor/timer already 
used elsewhere in this test class), prefer deterministically triggering the 
scheduled/timer tasks after advancing `manualClock` instead of polling with a 
real-time timeout.
   ```suggestion
                   Duration.ofMillis(100),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to