luoyuxia commented on code in PR #2454:
URL: https://github.com/apache/fluss/pull/2454#discussion_r2916325846


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -135,18 +152,21 @@ public void 
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
 
         if (committableWriteResults != null) {
             try {
-                Committable committable =
+                CommitResult commitResult =
                         commitWriteResults(
                                 tableId,
                                 tableBucketWriteResult.tablePath(),
                                 committableWriteResults);
-                // only emit when committable is not-null
-                if (committable != null) {
-                    output.collect(new StreamRecord<>(new 
CommittableMessage<>(committable)));
+                // only emit downstream when actual data was written
+                if (commitResult.committable != null) {
+                    output.collect(
+                            new StreamRecord<>(new 
CommittableMessage<>(commitResult.committable)));
                 }
-                // notify that the table id has been finished tier
+                // always notify the coordinator that this table's tiering 
round is done,
+                // even for empty commits — otherwise the coordinator will 
keep waiting

Review Comment:
   why notify for empty commits? what "coordinator will keep waiting" means?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java:
##########
@@ -153,6 +183,40 @@ public LakeCommitResult commit(
         }
     }
 
+    /**
+     * Computes cumulative table stats from the snapshot's base manifest list.
+     *
+     * <p>The base manifest list covers all live data files in the snapshot 
(main branch only,
+     * expired snapshots excluded). ADD entries increase the total; DELETE 
entries (produced by
+     * compaction) decrease it, yielding the net live file size and physical 
row count.
+     *
+     * <p>For primary-key tables the physical row count may include 
un-compacted delete rows at L0
+     * before a full compaction is completed.
+     *
+     * @return {@code long[]{totalFileSize, totalRowCount}}
+     */
+    @VisibleForTesting
+    static long[] computeTableStats(FileStore<?> store, Snapshot snapshot) {

Review Comment:
   Why not make it return TieringStats dierctly?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -162,28 +182,31 @@ public void 
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
         }
     }
 
-    @Nullable
-    private Committable commitWriteResults(
+    /**
+     * Commits the collected write results for one table to the lake and Fluss.
+     *
+     * <p>Always returns a non-null {@link CommitResult}. When all buckets 
produced no data (empty
+     * commit), {@link CommitResult#committable} is {@code null} and stats are 
{@link
+     * TieringStats#UNKNOWN}.
+     */
+    private CommitResult commitWriteResults(
             long tableId,
             TablePath tablePath,
             List<TableBucketWriteResult<WriteResult>> committableWriteResults)
             throws Exception {
-        // filter out non-null write result
-        committableWriteResults =
+        // filter down to buckets that actually produced data
+        List<TableBucketWriteResult<WriteResult>> nonEmptyResults =
                 committableWriteResults.stream()
-                        .filter(
-                                writeResultTableBucketWriteResult ->
-                                        
writeResultTableBucketWriteResult.writeResult() != null)
+                        .filter(r -> r.writeResult() != null)
                         .collect(Collectors.toList());
 
-        // empty, means all write result is null, which is a empty commit,
-        // return null to skip the empty commit
-        if (committableWriteResults.isEmpty()) {
+        // all buckets were empty — nothing to commit to the lake
+        if (nonEmptyResults.isEmpty()) {
             LOG.info(
                     "Commit tiering write results is empty for table {}, table 
path {}",
                     tableId,
                     tablePath);
-            return null;
+            return new CommitResult(null, TieringStats.UNKNOWN);

Review Comment:
   for empty commit, will the unkonwn tiering stats make the metric value drop 
to -1?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java:
##########
@@ -153,6 +183,40 @@ public LakeCommitResult commit(
         }
     }
 
+    /**
+     * Computes cumulative table stats from the snapshot's base manifest list.
+     *
+     * <p>The base manifest list covers all live data files in the snapshot 
(main branch only,
+     * expired snapshots excluded). ADD entries increase the total; DELETE 
entries (produced by
+     * compaction) decrease it, yielding the net live file size and physical 
row count.
+     *
+     * <p>For primary-key tables the physical row count may include 
un-compacted delete rows at L0
+     * before a full compaction is completed.
+     *
+     * @return {@code long[]{totalFileSize, totalRowCount}}
+     */
+    @VisibleForTesting
+    static long[] computeTableStats(FileStore<?> store, Snapshot snapshot) {
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        List<ManifestFileMeta> manifestFileMetas = 
manifestList.readDataManifests(snapshot);
+        long totalFileSize = 0L;
+        long totalRowCount = 0L;
+        for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {

Review Comment:
   I suggest not to parse manifest to get records count since it's not correct, 
condiering the primary key case or deletion vector.
   I suggest to use Catalog#loadSnapshot since it's correct and align with the 
number of Paimon Rest Catalog.
   But note only rest catalog support it, you will need to handle it. 
   It's fine only support rest catalog  



##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java:
##########
@@ -29,6 +34,8 @@ public class LakeTieringMetricGroup extends 
AbstractMetricGroup {
 
     private static final String NAME = "lakeTiering";
 
+    private final Map<Long, TableMetricGroup> metricGroupByTable = 
MapUtils.newConcurrentHashMap();

Review Comment:
   nit:
   Seem it's not to be concurrent map? Can just be map?



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.committer;
+
+import java.io.Serializable;
+
+/**
+ * Immutable statistics for a single completed tiering round of a lake table.
+ *
+ * <p>All long fields use {@code -1} as the sentinel value meaning "unknown / 
not supported".
+ */
+public final class TieringStats implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A {@code TieringStats} instance where every field is {@code -1} 
(unknown/unsupported). Use
+     * this as the default when no stats are available.
+     */
+    public static final TieringStats UNKNOWN = new TieringStats(-1L, -1L);
+
+    // 
-----------------------------------------------------------------------------------------
+    // Lake data stats (reported by the lake committer)
+    // 
-----------------------------------------------------------------------------------------
+
+    /** Cumulative total file size (bytes) of the lake table after this 
tiering round. */
+    private final long fileSize;

Review Comment:
   can we make it nullable to use null to represent unknow fileSize, so that 
the rpc can also be optional



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -645,4 +770,39 @@ protected int getPendingTablesCount() {
     protected int getRunningTablesCount() {
         return inReadLock(lock, liveTieringTableIds::size);
     }
+
+    @VisibleForTesting
+    protected Long getTableLastSuccessTime(long tableId) {
+        return inReadLock(
+                lock,
+                () -> {
+                    LastTieringResult r = lastTieringResult.get(tableId);
+                    return r != null ? r.tieredTime : null;
+                });
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastDuration(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.tierDuration));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastFileSize(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.fileSize));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastRecordCount(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.recordCount));
+    }
+
+    @VisibleForTesting
+    protected Long getTableFailureCount(long tableId) {
+        return inReadLock(lock, () -> tableTieringFailureCount.get(tableId));
+    }
+
+    @VisibleForTesting
+    protected TieringState getTableState(long tableId) {

Review Comment:
   nit:
   get warning from my ide:
   ```
   Class 'TieringState' is exposed outside its defined visibility scope 
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java:
##########
@@ -141,12 +153,47 @@ public LakeCommitResult commit(
                     snapshotId = rewriteCommitSnapshotId;
                 }
             }
-            return LakeCommitResult.committedIsReadable(snapshotId);
+
+            // collect cumulative table stats from the committed snapshot's 
summary.
+            // Use the specific committed snapshotId rather than 
currentSnapshot() to avoid
+            // reading a different snapshot in case of concurrent commits.
+            Snapshot committedSnapshot = icebergTable.snapshot(snapshotId);
+            long[] stats = extractSnapshotStats(committedSnapshot);
+            return LakeCommitResult.committedIsReadable(snapshotId, stats[0], 
stats[1]);
         } catch (Exception e) {
             throw new IOException("Failed to commit to Iceberg table.", e);
         }
     }
 
+    /**
+     * Extracts cumulative file size and record count from an Iceberg 
snapshot's summary map.
+     *
+     * <p>Stats collection is best-effort: if the snapshot is null, the 
summary is missing, or a
+     * value cannot be parsed as a long, the corresponding stat is left as 
{@code -1} and a warning
+     * is logged instead of propagating an exception.
+     *
+     * @param snapshot the committed Iceberg snapshot, may be {@code null}
+     * @return a two-element array {@code [totalFileSize, totalRecordCount]}, 
with {@code -1}
+     *     indicating an unknown value
+     */
+    private long[] extractSnapshotStats(@Nullable Snapshot snapshot) {

Review Comment:
   I'm curious about it.
   For log table, that's fine .
   But for primary table, according to iceberg code, if it has deletion file, 
the size related metrics won't be update. So, will it make the metric look 
werid, drop to -1 once one round of tiering meet  deletion file?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -234,21 +250,76 @@ private void scheduleTableTiering(long tableId) {
         if (existingDelayedTiering != null) {
             existingDelayedTiering.cancel();
         }
-        long delayMs = freshnessInterval - (clock.milliseconds() - 
lastTieredTime);
+        long delayMs = freshnessInterval - (clock.milliseconds() - 
lastResult.tieredTime);
         // if the delayMs is < 0, the DelayedTiering will be triggered at once 
without
         // adding into timing wheel.
         DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
         delayedTieringByTableId.put(tableId, delayedTiering);
         lakeTieringScheduleTimer.add(delayedTiering);
     }
 
+    private void registerTableMetrics(long tableId, TablePath tablePath) {
+        // create table-level metric group
+        MetricGroup tableMetricGroup =
+                tieringMetricGroup.addTableLakeTieringMetricGroup(tableId, 
tablePath);
+
+        // tierLag: milliseconds since last successful tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
+                () ->
+                        inReadLock(
+                                lock,
+                                () -> {
+                                    LastTieringResult r = 
lastTieringResult.get(tableId);
+                                    return r != null ? clock.milliseconds() - 
r.tieredTime : -1L;
+                                }));
+
+        // tierDuration: duration of last tiering job
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
+                () -> inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.tierDuration)));
+
+        // failuresTotal: total failure count for this table

Review Comment:
   can we use `counter` for `failuresTotal` since `counter` is suitable for the 
accumlate value



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -274,7 +276,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
                 boolean isForceFinished = 
tieringReachMaxDurationsTables.remove(finishedTableId);
                 LOG.info("Before finishedTables table {}.", finishedTables);

Review Comment:
   could you please help remove 
   ```
    LOG.info("Before finishedTables table {}.", finishedTables);
   ```
   ```
   LOG.info("After finishedTables table {}.", finishedTables);
   ```
   in this pr?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -645,4 +770,39 @@ protected int getPendingTablesCount() {
     protected int getRunningTablesCount() {
         return inReadLock(lock, liveTieringTableIds::size);
     }
+
+    @VisibleForTesting
+    protected Long getTableLastSuccessTime(long tableId) {
+        return inReadLock(
+                lock,
+                () -> {
+                    LastTieringResult r = lastTieringResult.get(tableId);
+                    return r != null ? r.tieredTime : null;
+                });
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastDuration(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.tierDuration));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastFileSize(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.fileSize));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastRecordCount(long tableId) {

Review Comment:
   the method return Long which always means nullable.
   but getLastResultField always return long which mean non-nullable, can we 
align them.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -645,4 +770,39 @@ protected int getPendingTablesCount() {
     protected int getRunningTablesCount() {
         return inReadLock(lock, liveTieringTableIds::size);
     }
+
+    @VisibleForTesting
+    protected Long getTableLastSuccessTime(long tableId) {
+        return inReadLock(
+                lock,
+                () -> {
+                    LastTieringResult r = lastTieringResult.get(tableId);
+                    return r != null ? r.tieredTime : null;
+                });
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastDuration(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.tierDuration));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastFileSize(long tableId) {
+        return inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.fileSize));
+    }
+
+    @VisibleForTesting
+    protected Long getTableLastRecordCount(long tableId) {

Review Comment:
   seesm it only used in testing code, can testing code just call 
`getLastResultField` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to