This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a8832866b1 [enhance](paimon)Doris Paimon Scan Metrics Integration 
(#59281)
9a8832866b1 is described below

commit 9a8832866b1bd781fa225d0aa56565288c81b002
Author: Chenjunwei <[email protected]>
AuthorDate: Mon Dec 29 10:36:39 2025 +0800

    [enhance](paimon)Doris Paimon Scan Metrics Integration (#59281)
    
    ### What problem does this PR solve?
    
    #### Overview
    
    This change pipes Apache Paimon scan metrics directly into Doris query
    profiles so operators can inspect per-scan
    statistics (manifests, file counts, scan durations, cache hit/miss) from
    the FE profile UI. The integration consists
    of three pieces:
    
    - Summary Profile Slot - SummaryProfile now includes a Paimon Scan
    Metrics entry in the execution summary list so the
      FE profile table reserves space for the Paimon telemetry.
    - Metric Registry + Reporter - a new PaimonMetricRegistry collects
    Paimon MetricGroups, and PaimonScanMetricsReporter
    formats ScanMetrics values and appends them to the summary entry
    whenever a Paimon scan runs.
    - Scan Integration - PaimonScanNode attaches a registry to the TableScan
    (when InnerTableScan supports
    withMetricsRegistry), plans splits, and reports metrics after planning.
    
    All metrics remain scoped to Paimon scans; other table formats are
    untouched and still populate their own
    runtime-profile sections as before.
    
    #### Implementation Details
    
    1. SummaryProfile
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java`
    
    - Added the constant `PAIMON_SCAN_METRICS = "Paimon Scan Metrics"`.
    - Inserted the key into `EXECUTION_SUMMARY_KEYS` (with indentation level
    3) so the runtime profile tree displays it
      under the scheduling block when present.
    - No default text is shown unless metrics are actually reported; the
    entry stays N/A otherwise.
    
    2. PaimonMetricRegistry
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java`
    
    - Extends `MetricRegistry` and caches `MetricGroup` instances keyed by
    `name:table`.
    - Uses the `table` tag to disambiguate groups
    (`MetricRegistry.KEY_TABLE` is private in Paimon).
    - Exposes `getGroup/removeGroup/clear` helpers to manage registry
    lifecycle.
    
    3. PaimonScanMetricsReporter
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonScanMetricsReporter.java`
    
    - Reads `ScanMetrics.GROUP_NAME` from the registry and appends a
    per-scan entry under "Paimon Scan Metrics" in the
      SummaryProfile.
    - Metrics covered: `last_scan_duration`, `scan_duration`
    (count/mean/p95/max), `last_scanned_manifests`,
    `last_scan_skipped_table_files`, `last_scan_resulted_table_files`,
    `manifest_hit_cache`, `manifest_missed_cache`.
    - If the direct lookup fails, falls back to the single ScanMetrics group
    in the registry (and skips reporting if
      multiple groups exist).
    - Cleans up the registry group after reporting.
    
    4. PaimonScanNode
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java`
    
    - Creates a `TableScan` via
    `newReadBuilder().withFilter().withProjection().newScan()`.
    - When scan is an `InnerTableScan`, attaches `PaimonMetricRegistry` with
    `withMetricsRegistry`.
    - Plans splits, reports metrics, and clears the registry to avoid leaks.
    
    5. Regression test
    
    
    
`regression-test/suites/external_table_p0/paimon/test_paimon_scan_metrics_profile.groovy`
    
    - Adds a Paimon scan metrics profile case that asserts "Paimon Scan
    Metrics" appears in the query profile.
    
    #### Example Profile Output
    
    After running a query against `test_paimon_scan_metrics.db1.all_table`,
    the FE profile shows:
    
    ```
      Paimon Scan Metrics:
        Table Scan (test_paimon_scan_metrics.db1.all_table):
           - last_scan_duration: 7ms
           - scan_duration: count=1, mean=7ms, p95=7ms, max=7ms
           - last_scanned_manifests: 1
           - last_scan_skipped_table_files: 0
           - last_scan_resulted_table_files: 3
           - manifest_hit_cache: 0
           - manifest_missed_cache: 1
    ```
---
 .../doris/common/profile/SummaryProfile.java       |   3 +
 .../paimon/profile/PaimonMetricRegistry.java       |  72 ++++++++++
 .../paimon/profile/PaimonScanMetricsReporter.java  | 152 +++++++++++++++++++++
 .../datasource/paimon/source/PaimonScanNode.java   |  19 ++-
 4 files changed, 243 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 004f2042c32..86a4e63c801 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -133,6 +133,7 @@ public class SummaryProfile {
     public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To 
FE";
     public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment 
Weight";
     public static final String ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics";
+    public static final String PAIMON_SCAN_METRICS = "Paimon Scan Metrics";
 
     // These info will display on FE's web ui table, every one will be 
displayed as
     // a column, so that should not
@@ -166,6 +167,7 @@ public class SummaryProfile {
             SINK_SET_PARTITION_VALUES_TIME,
             CREATE_SCAN_RANGE_TIME,
             ICEBERG_SCAN_METRICS,
+            PAIMON_SCAN_METRICS,
             NEREIDS_DISTRIBUTE_TIME,
             GET_META_VERSION_TIME,
             GET_PARTITION_VERSION_TIME,
@@ -218,6 +220,7 @@ public class SummaryProfile {
             .put(SINK_SET_PARTITION_VALUES_TIME, 3)
             .put(CREATE_SCAN_RANGE_TIME, 2)
             .put(ICEBERG_SCAN_METRICS, 3)
+            .put(PAIMON_SCAN_METRICS, 3)
             .put(GET_PARTITION_VERSION_TIME, 1)
             .put(GET_PARTITION_VERSION_COUNT, 1)
             .put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
new file mode 100644
index 00000000000..ba229fa61dd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon.profile;
+
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricGroupImpl;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonMetricRegistry extends MetricRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonMetricRegistry.class);
+    private static final String TABLE_TAG_KEY = "table";
+    private final ConcurrentHashMap<String, MetricGroup> groups = new 
ConcurrentHashMap<>();
+
+    @Override
+    protected MetricGroup createMetricGroup(String name, Map<String, String> 
tags) {
+        MetricGroup group = new MetricGroupImpl(name, tags);
+        String table = tags == null ? "" : tags.getOrDefault(TABLE_TAG_KEY, 
"");
+        groups.put(buildKey(name, table), group);
+        LOG.debug("Created metric group: {}:{}", name, table);
+        return group;
+    }
+
+    public MetricGroup getGroup(String name, String table) {
+        String key = buildKey(name, table);
+        MetricGroup group = groups.get(key);
+        if (group == null) {
+            LOG.warn("MetricGroup not found: {}", key);
+        }
+        return group;
+    }
+
+    public void removeGroup(String name, String table) {
+        groups.remove(buildKey(name, table));
+    }
+
+    public Collection<MetricGroup> getAllGroups() {
+        return groups.values();
+    }
+
+    public Map<String, MetricGroup> getAllGroupsAsMap() {
+        return new ConcurrentHashMap<>(groups);
+    }
+
+    public void clear() {
+        groups.clear();
+    }
+
+    private static String buildKey(String name, String table) {
+        return name + ":" + table;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonScanMetricsReporter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonScanMetricsReporter.java
new file mode 100644
index 00000000000..b76cf74dfda
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonScanMetricsReporter.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon.profile;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.profile.RuntimeProfile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.HistogramStatistics;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.operation.metrics.ScanMetrics;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class PaimonScanMetricsReporter {
+    private static final double P95 = 0.95d;
+
+    public static void report(TableIf table, String paimonTableName, 
PaimonMetricRegistry registry) {
+        if (registry == null || paimonTableName == null) {
+            return;
+        }
+        String resolvedTableName = paimonTableName;
+        MetricGroup group = registry.getGroup(ScanMetrics.GROUP_NAME, 
paimonTableName);
+        if (group == null) {
+            String prefix = ScanMetrics.GROUP_NAME + ":";
+            for (Map.Entry<String, MetricGroup> entry : 
registry.getAllGroupsAsMap().entrySet()) {
+                String key = entry.getKey();
+                if (!key.startsWith(prefix)) {
+                    continue;
+                }
+                if (group != null) {
+                    group = null;
+                    break;
+                }
+                group = entry.getValue();
+                resolvedTableName = key.substring(prefix.length());
+            }
+        }
+        if (group == null) {
+            return;
+        }
+        Map<String, Metric> metrics = group.getMetrics();
+        if (metrics == null || metrics.isEmpty()) {
+            return;
+        }
+
+        SummaryProfile summaryProfile = 
SummaryProfile.getSummaryProfile(ConnectContext.get());
+        if (summaryProfile == null) {
+            return;
+        }
+        RuntimeProfile executionSummary = summaryProfile.getExecutionSummary();
+        if (executionSummary == null) {
+            return;
+        }
+
+        RuntimeProfile paimonGroup = 
executionSummary.getChildMap().get(SummaryProfile.PAIMON_SCAN_METRICS);
+        if (paimonGroup == null) {
+            paimonGroup = new 
RuntimeProfile(SummaryProfile.PAIMON_SCAN_METRICS);
+            executionSummary.addChild(paimonGroup, true);
+        }
+
+        String displayName = table == null ? paimonTableName : 
table.getNameWithFullQualifiers();
+        RuntimeProfile scanProfile = new RuntimeProfile("Table Scan (" + 
displayName + ")");
+        appendDuration(scanProfile, metrics, ScanMetrics.LAST_SCAN_DURATION, 
"last_scan_duration");
+        appendHistogram(scanProfile, metrics, ScanMetrics.SCAN_DURATION, 
"scan_duration");
+        appendCounter(scanProfile, metrics, 
ScanMetrics.LAST_SCANNED_MANIFESTS, "last_scanned_manifests");
+        appendCounter(scanProfile, metrics, 
ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
+                "last_scan_skipped_table_files");
+        appendCounter(scanProfile, metrics, 
ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
+                "last_scan_resulted_table_files");
+        appendCounter(scanProfile, metrics, ScanMetrics.MANIFEST_HIT_CACHE, 
"manifest_hit_cache");
+        appendCounter(scanProfile, metrics, ScanMetrics.MANIFEST_MISSED_CACHE, 
"manifest_missed_cache");
+        paimonGroup.addChild(scanProfile, true);
+        registry.removeGroup(ScanMetrics.GROUP_NAME, resolvedTableName);
+    }
+
+    private static void appendDuration(RuntimeProfile profile, Map<String, 
Metric> metrics, String metricKey,
+                                       String profileKey) {
+        Long value = getLongValue(metrics.get(metricKey));
+        if (value == null) {
+            return;
+        }
+        profile.addInfoString(profileKey, formatDuration(value));
+    }
+
+    private static void appendCounter(RuntimeProfile profile, Map<String, 
Metric> metrics, String metricKey,
+                                      String profileKey) {
+        Long value = getLongValue(metrics.get(metricKey));
+        if (value == null) {
+            return;
+        }
+        profile.addInfoString(profileKey, Long.toString(value));
+    }
+
+    private static void appendHistogram(RuntimeProfile profile, Map<String, 
Metric> metrics, String metricKey,
+                                        String profileKey) {
+        Metric metric = metrics.get(metricKey);
+        if (!(metric instanceof Histogram)) {
+            return;
+        }
+        Histogram histogram = (Histogram) metric;
+        HistogramStatistics stats = histogram.getStatistics();
+        if (stats == null) {
+            return;
+        }
+        String formatted = "count=" + histogram.getCount()
+                + ", mean=" + formatDuration(stats.getMean())
+                + ", p95=" + formatDuration(stats.getQuantile(P95))
+                + ", max=" + formatDuration(stats.getMax());
+        profile.addInfoString(profileKey, formatted);
+    }
+
+    private static Long getLongValue(Metric metric) {
+        if (metric instanceof Counter) {
+            return ((Counter) metric).getCount();
+        }
+        if (metric instanceof Gauge) {
+            Object value = ((Gauge) metric).getValue();
+            if (value instanceof Number) {
+                return ((Number) value).longValue();
+            }
+        }
+        return null;
+    }
+
+    private static String formatDuration(double nanos) {
+        long ms = TimeUnit.NANOSECONDS.toMillis(Math.round(nanos));
+        return DebugUtil.getPrettyStringMs(ms);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 85aa0bdc96c..92b0862d06e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -34,6 +34,8 @@ import 
org.apache.doris.datasource.credentials.VendedCredentialsFactory;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.PaimonUtil;
+import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
+import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
@@ -55,8 +57,10 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -413,9 +417,19 @@ public class PaimonScanNode extends FileQueryScanNode {
                 .filter(i -> i >= 0)
                 .toArray();
         ReadBuilder readBuilder = paimonTable.newReadBuilder();
-        return readBuilder.withFilter(predicates)
+        TableScan scan = readBuilder.withFilter(predicates)
                 .withProjection(projected)
-                .newScan().plan().splits();
+                .newScan();
+        PaimonMetricRegistry registry = new PaimonMetricRegistry();
+        if (scan instanceof InnerTableScan) {
+            scan = ((InnerTableScan) scan).withMetricsRegistry(registry);
+        }
+        List<org.apache.paimon.table.source.Split> splits = 
scan.plan().splits();
+        PaimonScanMetricsReporter.report(source.getTargetTable(), 
paimonTable.name(), registry);
+        if (!registry.getAllGroups().isEmpty()) {
+            registry.clear();
+        }
+        return splits;
     }
 
     private String getFileFormat(String path) {
@@ -699,4 +713,3 @@ public class PaimonScanNode extends FileQueryScanNode {
         return baseTable;
     }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to