This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 44e4e6f7f25 [improvement](statistics)Optimize drop stats operation 
(#30158)
44e4e6f7f25 is described below

commit 44e4e6f7f25ac95249d239970b689a5a9bfc708d
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Sun Jan 21 09:58:01 2024 +0800

    [improvement](statistics)Optimize drop stats operation (#30158)
---
 .../apache/doris/service/FrontendServiceImpl.java  | 12 ++++-
 .../apache/doris/statistics/AnalysisManager.java   | 61 ++++++++++++++++------
 .../doris/statistics/InvalidateStatsTarget.java    | 36 +++++++++++++
 .../apache/doris/statistics/StatisticsCache.java   | 17 ++----
 4 files changed, 93 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 11b42913650..198fdf1d10a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -77,7 +77,10 @@ import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.InvalidateStatsTarget;
 import org.apache.doris.statistics.StatisticsCacheKey;
+import org.apache.doris.statistics.TableStatsMeta;
 import org.apache.doris.statistics.query.QueryStats;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -3088,8 +3091,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     @Override
     public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest 
request) throws TException {
-        StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, 
StatisticsCacheKey.class);
-        Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, 
k.idxId, k.colName);
+        InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, 
InvalidateStatsTarget.class);
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        TableStatsMeta tableStats = 
analysisManager.findTableStatsStatus(target.tableId);
+        if (tableStats == null) {
+            return new TStatus(TStatusCode.OK);
+        }
+        analysisManager.invalidateLocalStats(target.tableId, target.columns, 
tableStats);
         return new TStatus(TStatusCode.OK);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index fb7cf3ed384..742afe2957f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -59,6 +59,9 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
 import org.apache.doris.statistics.util.DBObjects;
 import org.apache.doris.statistics.util.SimpleQueue;
 import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -655,18 +658,9 @@ public class AnalysisManager implements Writable {
         if (tableStats == null) {
             return;
         }
-        if (cols == null) {
-            tableStats.reset();
-        } else {
-            dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
-            StatisticsCache statisticsCache = 
Env.getCurrentEnv().getStatisticsCache();
-            for (String col : cols) {
-                statisticsCache.syncInvalidate(tblId, -1L, col);
-            }
-            tableStats.updatedTime = 0;
-        }
-        tableStats.userInjected = false;
-        logCreateTableStats(tableStats);
+        invalidateLocalStats(tblId, cols, tableStats);
+        // Drop stats ddl is master only operation.
+        invalidateRemoteStats(tblId, cols);
         StatisticsRepository.dropStatistics(tblId, cols);
     }
 
@@ -676,14 +670,47 @@ public class AnalysisManager implements Writable {
             return;
         }
         Set<String> cols = 
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
+        invalidateLocalStats(table.getId(), cols, tableStats);
+        // Drop stats ddl is master only operation.
+        invalidateRemoteStats(table.getId(), cols);
+        StatisticsRepository.dropStatistics(table.getId(), cols);
+    }
+
+    public void invalidateLocalStats(long tableId, Set<String> columns, 
TableStatsMeta tableStats) {
+        if (tableStats == null) {
+            return;
+        }
         StatisticsCache statisticsCache = 
Env.getCurrentEnv().getStatisticsCache();
-        for (String col : cols) {
-            tableStats.removeColumn(col);
-            statisticsCache.syncInvalidate(table.getId(), -1L, col);
+        if (columns != null) {
+            for (String column : columns) {
+                tableStats.removeColumn(column);
+                statisticsCache.invalidate(tableId, -1, column);
+            }
         }
         tableStats.updatedTime = 0;
-        logCreateTableStats(tableStats);
-        StatisticsRepository.dropStatistics(table.getId(), cols);
+        tableStats.userInjected = false;
+    }
+
+    public void invalidateRemoteStats(long tableId, Set<String> columns) {
+        InvalidateStatsTarget target = new InvalidateStatsTarget(tableId, 
columns);
+        TInvalidateFollowerStatsCacheRequest request = new 
TInvalidateFollowerStatsCacheRequest();
+        request.key = GsonUtils.GSON.toJson(target);
+        StatisticsCache statisticsCache = 
Env.getCurrentEnv().getStatisticsCache();
+        SystemInfoService.HostInfo selfNode = 
Env.getCurrentEnv().getSelfNode();
+        boolean success = true;
+        for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
+            // Skip master
+            if (selfNode.equals(frontend.getHost())) {
+                continue;
+            }
+            success = success && statisticsCache.invalidateStats(frontend, 
request);
+        }
+        if (!success) {
+            // If any rpc failed, use edit log to sync table stats to 
non-master FEs.
+            LOG.warn("Failed to invalidate all remote stats by rpc for table 
{}, use edit log.", tableId);
+            TableStatsMeta tableStats = findTableStatsStatus(tableId);
+            logCreateTableStats(tableStats);
+        }
     }
 
     public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) 
throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
new file mode 100644
index 00000000000..1ee7b745048
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Set;
+
+public class InvalidateStatsTarget {
+
+    @SerializedName("tableId")
+    public final long tableId;
+
+    @SerializedName("columns")
+    public final Set<String> columns;
+
+    public InvalidateStatsTarget(long tableId, Set<String> columns) {
+        this.tableId = tableId;
+        this.columns = columns;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index fbec9a60fa0..0cf2808222e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -137,19 +137,6 @@ public class StatisticsCache {
         columnStatisticsCache.synchronous().invalidate(new 
StatisticsCacheKey(tblId, idxId, colName));
     }
 
-    public void syncInvalidate(long tblId, long idxId, String colName) {
-        StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, 
colName);
-        columnStatisticsCache.synchronous().invalidate(cacheKey);
-        TInvalidateFollowerStatsCacheRequest request = new 
TInvalidateFollowerStatsCacheRequest();
-        request.key = GsonUtils.GSON.toJson(cacheKey);
-        for (Frontend frontend : 
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
-            if (StatisticsUtil.isMaster(frontend)) {
-                continue;
-            }
-            invalidateStats(frontend, request);
-        }
-    }
-
     public void updateColStatsCache(long tblId, long idxId, String colName, 
ColumnStatistic statistic) {
         columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, 
idxId, colName), Optional.of(statistic));
     }
@@ -261,7 +248,7 @@ public class StatisticsCache {
     }
 
     @VisibleForTesting
-    public void invalidateStats(Frontend frontend, 
TInvalidateFollowerStatsCacheRequest request) {
+    public boolean invalidateStats(Frontend frontend, 
TInvalidateFollowerStatsCacheRequest request) {
         TNetworkAddress address = new TNetworkAddress(frontend.getHost(), 
frontend.getRpcPort());
         FrontendService.Client client = null;
         try {
@@ -269,11 +256,13 @@ public class StatisticsCache {
             client.invalidateStatsCache(request);
         } catch (Throwable t) {
             LOG.warn("Failed to sync invalidate to follower: {}", address, t);
+            return false;
         } finally {
             if (client != null) {
                 ClientPool.frontendPool.returnObject(address, client);
             }
         }
+        return true;
     }
 
     public void putCache(StatisticsCacheKey k, ColumnStatistic c) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to