This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c11a46dd829c5510df0e50ccf9d522586ef63d6c
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sat Jul 20 15:46:49 2024 +0800

    [Enhancement](group commit)Optimize be select for group commit (#35558)
    
    1. Streamload and insert into, if batched and sent to the master FE,
    should use a consistent BE strategy (previously, insert into reused the
    first selected BE, while streamload used round robin). First, a map
    <table id, be id> records a fixed be id for a certain table. The first
    time a table is imported, a BE is randomly selected, and this table id
    and be id are recorded in the map permanently. Subsequently, all data
    imported into this table will select the BE corresponding to the table
    id recorded in the map. This ensures that batching is maximized to a
    single BE.
    To address the issue of excessive load on a single BE, a variable
    similar to a bvar window is used to monitor the total data volume sent
    to a specific BE for a specific table during the batch interval (default
    10 seconds). A second map <be id, window variable> is used to track
    this. If a new import finds that its corresponding BE's window variable
    is less than a certain value (e.g., 1G), the new import continues to be
    sent to the corresponding BE according to map1. If it exceeds this
    value, the new import is sent to another BE with the smallest window
    variable value, and map1 is updated. If every BE exceeds this value, the
    one with the smallest value is still chosen. This helps to alleviate
    excessive pressure on a single BE.
    
    2. For streamload, if batched and sent to a BE, it will batch directly
    on this BE and will commit the transaction at the end of the import. At
    this point, a request is sent to the FE, which records the size of this
    import and adds it to the window variable.
    
    3. Streamload sent to observer FE, as well as insert into sent to
    observer FE, follow the logic in 1 by RPC, passing the table id to the
    master FE to obtain the selected be id.
---
 be/src/runtime/group_commit_mgr.cpp                |   7 +
 .../cloud/planner/CloudGroupCommitPlanner.java     |  43 +----
 .../doris/common/util/SlidingWindowCounter.java    |  73 ++++++++
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  42 +++--
 .../org/apache/doris/load/GroupCommitManager.java  | 196 +++++++++++++++++++++
 .../insert/OlapGroupCommitInsertExecutor.java      |  13 ++
 .../apache/doris/planner/GroupCommitPlanner.java   |  29 +--
 .../main/java/org/apache/doris/qe/Coordinator.java |  25 ++-
 .../java/org/apache/doris/qe/MasterOpExecutor.java |  49 ++++++
 .../apache/doris/service/FrontendServiceImpl.java  |  31 ++++
 gensrc/thrift/FrontendService.thrift               |  15 ++
 11 files changed, 446 insertions(+), 77 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 5f989da023b..30885fa1ac9 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -442,6 +442,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         request.__set_db_id(db_id);
         request.__set_table_id(table_id);
         request.__set_txnId(txn_id);
+        request.__set_groupCommit(true);
+        request.__set_receiveBytes(state->num_bytes_load_total());
+        if (_exec_env->master_info()->__isset.backend_id) {
+            request.__set_backendId(_exec_env->master_info()->backend_id);
+        } else {
+            LOG(WARNING) << "_exec_env->master_info not set backend_id";
+        }
         if (state) {
             request.__set_commitInfos(state->tablet_commit_infos());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
index 6978ec65e72..782f78e6bc4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
@@ -20,24 +20,18 @@ package org.apache.doris.cloud.planner;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class CloudGroupCommitPlanner extends GroupCommitPlanner {
     private static final Logger LOG = 
LogManager.getLogger(CloudGroupCommitPlanner.class);
@@ -50,36 +44,11 @@ public class CloudGroupCommitPlanner extends 
GroupCommitPlanner {
 
     @Override
     protected void selectBackends(ConnectContext ctx) throws DdlException {
-        backend = ctx.getInsertGroupCommit(this.table.getId());
-        if (backend != null && backend.isAlive() && !backend.isDecommissioned()
-                && 
backend.getCloudClusterName().equals(ctx.getCloudCluster())) {
-            return;
+        try {
+            backend = Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(this.table.getId(), ctx, 
true);
+        } catch (LoadException e) {
+            throw new DdlException("No suitable backend");
         }
-
-        String cluster = ctx.getCloudCluster();
-        if (Strings.isNullOrEmpty(cluster)) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
-        }
-
-        // select be
-        List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
-                .values().stream().collect(Collectors.toList());
-        Collections.shuffle(backends);
-        for (Backend backend : backends) {
-            if (backend.isActive() && !backend.isDecommissioned()) {
-                this.backend = backend;
-                ctx.setInsertGroupCommit(this.table.getId(), backend);
-                LOG.debug("choose new be {}", backend.getId());
-                return;
-            }
-        }
-
-        List<String> backendsInfo = backends.stream()
-                .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() 
+ ", active=" + be.isActive()
-                        + ", decommission=" + be.isDecommissioned() + " }")
-                .collect(Collectors.toList());
-        throw new DdlException("No suitable backend for cloud cluster=" + 
cluster + ", backends = " + backendsInfo);
     }
-
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
new file mode 100644
index 00000000000..787fbb06a2f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+
+public class SlidingWindowCounter {
+    private final int windowSizeInSeconds;
+    private final int numberOfBuckets;
+    private final AtomicLongArray buckets;
+    private final AtomicLongArray bucketTimestamps;
+
+    public SlidingWindowCounter(int windowSizeInSeconds) {
+        this.windowSizeInSeconds = windowSizeInSeconds;
+        this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 
1 second
+        this.buckets = new AtomicLongArray(numberOfBuckets);
+        this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
+    }
+
+    private int getCurrentBucketIndex() {
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        return (int) (currentTime % numberOfBuckets);
+    }
+
+    private void updateCurrentBucket() {
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        int currentBucketIndex = getCurrentBucketIndex();
+        long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);
+
+        if (currentTime - bucketTimestamp >= 1) {
+            buckets.set(currentBucketIndex, 0);
+            bucketTimestamps.set(currentBucketIndex, currentTime);
+        }
+    }
+
+    public void add(long value) {
+        updateCurrentBucket();
+        int bucketIndex = getCurrentBucketIndex();
+        buckets.addAndGet(bucketIndex, value);
+    }
+
+    public long get() {
+        updateCurrentBucket();
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        long count = 0;
+
+        for (int i = 0; i < numberOfBuckets; i++) {
+            if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
+                count += buckets.get(i);
+            }
+        }
+        return count;
+    }
+
+    public String toString() {
+        return String.valueOf(get());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 5767e303f35..47c495b2913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -20,6 +20,7 @@ package org.apache.doris.httpv2.rest;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -143,11 +144,16 @@ public class LoadAction extends RestBaseController {
         String sql = request.getHeader("sql");
         LOG.info("streaming load sql={}", sql);
         boolean groupCommit = false;
+        long tableId = -1;
         String groupCommitStr = request.getHeader("group_commit");
         if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
             groupCommit = true;
             try {
                 String[] pair = parseDbAndTb(sql);
+                Database db = Env.getCurrentInternalCatalog()
+                        .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
+                Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
+                tableId = tbl.getId();
                 if (isGroupCommitBlock(pair[0], pair[1])) {
                     String msg = "insert table " + pair[1] + 
GroupCommitPlanner.SCHEMA_CHANGE;
                     return new RestBaseResult(msg);
@@ -165,7 +171,7 @@ public class LoadAction extends RestBaseController {
             }
 
             String label = request.getHeader(LABEL_KEY);
-            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
groupCommit);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
groupCommit, tableId);
 
             LOG.info("redirect load action to destination={}, label: {}",
                     redirectAddr.toString(), label);
@@ -287,7 +293,9 @@ public class LoadAction extends RestBaseController {
                     return new RestBaseResult(e.getMessage());
                 }
             } else {
-                redirectAddr = selectRedirectBackend(request, groupCommit);
+                long tableId = ((OlapTable) ((Database) 
Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
+                        .get()).getTable(tableName).get()).getId();
+                redirectAddr = selectRedirectBackend(request, groupCommit, 
tableId);
             }
 
             LOG.info("redirect load action to destination={}, stream: {}, db: 
{}, tbl: {}, label: {}",
@@ -320,7 +328,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No transaction operation(\'commit\' 
or \'abort\') selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false, -1);
             LOG.info("redirect stream load 2PC action to destination={}, db: 
{}, txn: {}, operation: {}",
                     redirectAddr.toString(), dbName, 
request.getHeader(TXN_ID_KEY), txnOperation);
 
@@ -352,7 +360,7 @@ public class LoadAction extends RestBaseController {
         return "";
     }
 
-    private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit)
+    private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit, long tableId)
             throws LoadException {
         long debugBackendId = 
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
 -1L);
         if (debugBackendId != -1L) {
@@ -366,11 +374,12 @@ public class LoadAction extends RestBaseController {
             }
             return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit);
         } else {
-            return selectLocalRedirectBackend(groupCommit);
+            return selectLocalRedirectBackend(groupCommit, request, tableId);
         }
     }
 
-    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) 
throws LoadException {
+    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, 
HttpServletRequest request, long tableId)
+            throws LoadException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -390,12 +399,17 @@ public class LoadAction extends RestBaseController {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
         if (groupCommit) {
-            for (Long backendId : backendIds) {
-                Backend candidateBe = 
Env.getCurrentSystemInfo().getBackend(backendId);
-                if (!candidateBe.isDecommissioned()) {
-                    backend = candidateBe;
-                    break;
-                }
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            ctx.setRemoteIP(request.getRemoteAddr());
+            ctx.setThreadLocalInfo();
+
+            try {
+                backend = Env.getCurrentEnv().getGroupCommitManager()
+                        .selectBackendForGroupCommit(tableId, ctx, false);
+            } catch (DdlException e) {
+                throw new RuntimeException(e);
             }
         } else {
             backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@@ -573,10 +587,10 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No label selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false, -1);
 
             LOG.info("Redirect load action with auth token to destination={},"
-                        + "stream: {}, db: {}, tbl: {}, label: {}",
+                            + "stream: {}, db: {}, tbl: {}, label: {}",
                     redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
 
             URI urlObj = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index c4bf1e03c9c..ac365f9166f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -18,21 +18,38 @@
 package org.apache.doris.load;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.util.SlidingWindowCounter;
+import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.MasterOpExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 public class GroupCommitManager {
 
@@ -40,6 +57,11 @@ public class GroupCommitManager {
 
     private Set<Long> blockedTableIds = new HashSet<>();
 
+    // Table id to BE id map. Only for group commit.
+    private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
+    // BE id to pressure map. Only for group commit.
+    private Map<Long, SlidingWindowCounter> tablePressureMap = new 
ConcurrentHashMap<>();
+
     public boolean isBlock(long tableId) {
         return blockedTableIds.contains(tableId);
     }
@@ -163,4 +185,178 @@ public class GroupCommitManager {
         return size;
     }
 
+    public Backend selectBackendForGroupCommit(long tableId, ConnectContext 
context, boolean isCloud)
+            throws LoadException, DdlException {
+        // If a group commit request is sent to the follower FE, we will send 
this request to the master FE. master FE
+        // can select a BE and return this BE id to follower FE.
+        if (!Env.getCurrentEnv().isMaster()) {
+            try {
+                long backendId = new MasterOpExecutor(context)
+                        .getGroupCommitLoadBeId(tableId, 
context.getCloudCluster(), isCloud);
+                return Env.getCurrentSystemInfo().getBackend(backendId);
+            } catch (Exception e) {
+                throw new LoadException(e.getMessage());
+            }
+        } else {
+            // Master FE will select BE by itself.
+            return Env.getCurrentSystemInfo()
+                    .getBackend(selectBackendForGroupCommitInternal(tableId, 
context.getCloudCluster(), isCloud));
+        }
+    }
+
+    public long selectBackendForGroupCommitInternal(long tableId, String 
cluster, boolean isCloud)
+            throws LoadException, DdlException {
+        // Understanding Group Commit and Backend Selection Logic
+        //
+        // Group commit is a server-side technique used for batching data 
imports.
+        // The primary purpose of group commit is to enhance import 
performance by
+        // reducing the number of versions created for high-frequency, 
small-batch imports.
+        // Without batching, each import operation creates a separate version, 
similar to a rowset in an LSM Tree,
+        // which can consume significant compaction resources and degrade 
system performance.
+        // By batching data, fewer versions are generated from the same amount 
of data,
+        // thus minimizing compaction and improving performance. For detailed 
usage,
+        // you can refer to the Group Commit Manual
+        // 
(https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/)
 .
+        //
+        // The specific backend (BE) selection logic for group commits aims to
+        // direct data belonging to the same table to the same BE for batching.
+        // This is because group commit batches data imported to the same table
+        // on the same BE into a single version, which is then flushed 
periodically.
+        // For example, if data for the same table is distributed across three 
BEs,
+        // it will result in three versions.
+        // Conversely, if data for four different tables is directed to the 
same BE,
+        // it will create four versions. However,
+        // directing all data for the same table to a single BE will only 
produce one version.
+        //
+        // To optimize performance and avoid overloading a single BE, the 
strategy for selecting a BE works as follows:
+        //
+        // If a BE is already handling imports for table A and is not under 
significant load,
+        // the data is sent to this BE.
+        // If the BE is overloaded or if there is no existing record of a BE 
handling imports for table A,
+        // a BE is chosen at random. This BE is then recorded along with the 
mapping of table A and its load level.
+        // This approach ensures that group commits can effectively batch data 
together
+        // while managing the load on each BE efficiently.
+        return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, 
cluster)
+                : selectBackendForLocalGroupCommitInternal(tableId);
+    }
+
+    private long selectBackendForCloudGroupCommitInternal(long tableId, String 
cluster)
+            throws DdlException, LoadException {
+        LOG.debug("cloud group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
+                tablePressureMap.toString());
+        if (Strings.isNullOrEmpty(cluster)) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
+        }
+
+        Long cachedBackendId = getCachedBackend(tableId);
+        if (cachedBackendId != null) {
+            return cachedBackendId;
+        }
+
+        List<Backend> backends = new ArrayList<>(
+                ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
+                        .values());
+        if (backends.isEmpty()) {
+            throw new LoadException("No alive backend");
+        }
+        // If the cached backend is not active or decommissioned, select a 
random new backend.
+        Long randomBackendId = getRandomBackend(tableId, backends);
+        if (randomBackendId != null) {
+            return randomBackendId;
+        }
+        List<String> backendsInfo = backends.stream()
+                .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() 
+ ", active=" + be.isActive()
+                        + ", decommission=" + be.isDecommissioned() + " }")
+                .collect(Collectors.toList());
+        throw new LoadException("No suitable backend for cloud cluster=" + 
cluster + ", backends = " + backendsInfo);
+    }
+
+    private long selectBackendForLocalGroupCommitInternal(long tableId) throws 
LoadException {
+        LOG.debug("group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
+                tablePressureMap.toString());
+        Long cachedBackendId = getCachedBackend(tableId);
+        if (cachedBackendId != null) {
+            return cachedBackendId;
+        }
+
+        List<Backend> backends = new 
ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends());
+        if (backends.isEmpty()) {
+            throw new LoadException("No alive backend");
+        }
+
+        // If the cached backend is not active or decommissioned, select a 
random new backend.
+        Long randomBackendId = getRandomBackend(tableId, backends);
+        if (randomBackendId != null) {
+            return randomBackendId;
+        }
+        List<String> backendsInfo = backends.stream()
+                .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() 
+ ", active=" + be.isActive()
+                        + ", decommission=" + be.isDecommissioned() + " }")
+                .collect(Collectors.toList());
+        throw new LoadException("No suitable backend " + ", backends = " + 
backendsInfo);
+    }
+
+    @Nullable
+    private Long getCachedBackend(long tableId) {
+        OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
+        if (tableToBeMap.containsKey(tableId)) {
+            if (tablePressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
+                Backend backend = 
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+                if (backend.isActive() && !backend.isDecommissioned()) {
+                    return backend.getId();
+                } else {
+                    tableToBeMap.remove(tableId);
+                }
+            } else {
+                tableToBeMap.remove(tableId);
+            }
+        }
+        return null;
+    }
+
+    @Nullable
+    private Long getRandomBackend(long tableId, List<Backend> backends) {
+        OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
+        Collections.shuffle(backends);
+        for (Backend backend : backends) {
+            if (backend.isActive() && !backend.isDecommissioned()) {
+                tableToBeMap.put(tableId, backend.getId());
+                tablePressureMap.put(tableId,
+                        new 
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
+                return backend.getId();
+            }
+        }
+        return null;
+    }
+
+    public void updateLoadData(long tableId, long receiveData) {
+        if (tableId == -1) {
+            LOG.warn("invalid table id: " + tableId);
+        }
+        if (!Env.getCurrentEnv().isMaster()) {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            // set user to ADMIN_USER, so that we can get the proper resource 
tag
+            ctx.setQualifiedUser(Auth.ADMIN_USER);
+            ctx.setThreadLocalInfo();
+            try {
+                new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            updateLoadDataInternal(tableId, receiveData);
+        }
+    }
+
+    public void updateLoadDataInternal(long tableId, long receiveData) {
+        if (tablePressureMap.containsKey(tableId)) {
+            tablePressureMap.get(tableId).add(receiveData);
+            LOG.info("Update load data for table{}, receiveData {}, 
tablePressureMap {}", tableId, receiveData,
+                    tablePressureMap.toString());
+        } else {
+            LOG.warn("can not find backend id: {}", tableId);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 984e8b0c8ca..edf8251f97f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -17,12 +17,15 @@
 
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -66,6 +69,16 @@ public class OlapGroupCommitInsertExecutor extends 
OlapInsertExecutor {
                 && (tableSink.child() instanceof OneRowRelation || 
tableSink.child() instanceof LogicalUnion));
     }
 
+    @Override
+    protected void beforeExec() {
+        try {
+            
this.coordinator.setGroupCommitBe(Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(table.getId(), ctx, false));
+        } catch (LoadException | DdlException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void beginTransaction() {
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 09c2f72b5ca..bc3759d4e62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FormatOptions;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
@@ -58,7 +59,6 @@ import org.apache.thrift.TSerializer;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -145,30 +145,13 @@ public class GroupCommitPlanner {
         return future.get();
     }
 
-    // cloud override
     protected void selectBackends(ConnectContext ctx) throws DdlException {
-        backend = ctx.getInsertGroupCommit(this.table.getId());
-        if (backend != null && backend.isAlive() && 
!backend.isDecommissioned()) {
-            return;
+        try {
+            backend = Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(this.table.getId(), ctx, 
false);
+        } catch (LoadException e) {
+            throw new DdlException("No suitable backend");
         }
-
-        List<Long> allBackendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-        if (allBackendIds.isEmpty()) {
-            throw new DdlException("No alive backend");
-        }
-        Collections.shuffle(allBackendIds);
-        for (Long beId : allBackendIds) {
-            backend = Env.getCurrentSystemInfo().getBackend(beId);
-            if (!backend.isDecommissioned()) {
-                ctx.setInsertGroupCommit(this.table.getId(), backend);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("choose new be {}", backend.getId());
-                }
-                return;
-            }
-        }
-
-        throw new DdlException("No suitable backend");
     }
 
     public Backend getBackend() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 17ea6ceb983..a52bffe308e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -266,6 +266,8 @@ public class Coordinator implements CoordInterface {
 
     private boolean useNereids = false;
 
+    private Backend groupCommitBackend;
+
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -292,6 +294,10 @@ public class Coordinator implements CoordInterface {
     // fragmentid -> backendid
     private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
 
+    public void setGroupCommitBe(Backend backend) {
+        this.groupCommitBackend = backend;
+    }
+
     public void setTWorkloadGroups(List<TPipelineWorkloadGroup> 
tWorkloadGroups) {
         this.tWorkloadGroups = tWorkloadGroups;
     }
@@ -1717,8 +1723,11 @@ public class Coordinator implements CoordInterface {
             if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
                 Reference<Long> backendIdRef = new Reference<Long>();
                 TNetworkAddress execHostport;
-                if (((ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan
-                        && Config.prefer_compute_node_for_external_table)) && 
!addressToBackendID.isEmpty()) {
+                if (groupCommitBackend != null) {
+                    execHostport = getGroupCommitBackend(addressToBackendID);
+                } else if (((ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()) || (
+                        isAllExternalScan
+                                && 
Config.prefer_compute_node_for_external_table)) && 
!addressToBackendID.isEmpty()) {
                     // 2 cases:
                     // case 1: user set resource tag, we need to use the BE 
with the specified resource tags.
                     // case 2: All scan nodes are external scan node,
@@ -1909,7 +1918,9 @@ public class Coordinator implements CoordInterface {
             if (params.instanceExecParams.isEmpty()) {
                 Reference<Long> backendIdRef = new Reference<Long>();
                 TNetworkAddress execHostport;
-                if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
+                if (groupCommitBackend != null) {
+                    execHostport = getGroupCommitBackend(addressToBackendID);
+                } else if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
                         && !addressToBackendID.isEmpty()) {
                     // In this case, we only use the BE where the replica 
selected by the tag is located to
                     // execute this query. Otherwise, except for the scan 
node, the rest of the execution nodes
@@ -1933,6 +1944,14 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    private TNetworkAddress getGroupCommitBackend(Map<TNetworkAddress, Long> 
addressToBackendID) {
+        // Used for Nereids planner Group commit insert BE select.
+        TNetworkAddress execHostport = new 
TNetworkAddress(groupCommitBackend.getHost(),
+                groupCommitBackend.getBePort());
+        addressToBackendID.put(execHostport, groupCommitBackend.getId());
+        return execHostport;
+    }
+
     // Traverse the expected runtimeFilterID in each fragment, and establish 
the corresponding relationship
     // between runtimeFilterID and fragment instance addr and select the merge 
instance of runtimeFilter
     private void assignRuntimeFilterAddr() throws Exception {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index a14379ddb77..eaab01df556 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TGroupCommitInfo;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -101,6 +102,17 @@ public class MasterOpExecutor {
         waitOnReplaying();
     }
 
+    public long getGroupCommitLoadBeId(long tableId, String cluster, boolean 
isCloud) throws Exception {
+        result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster, 
isCloud));
+        waitOnReplaying();
+        return result.groupCommitLoadBeId;
+    }
+
+    public void updateLoadData(long tableId, long receiveData) throws 
Exception {
+        result = forward(buildUpdateLoadDataParams(tableId, receiveData));
+        waitOnReplaying();
+    }
+
     public void cancel() throws Exception {
         TUniqueId queryId = ctx.queryId();
         if (queryId == null) {
@@ -232,6 +244,43 @@ public class MasterOpExecutor {
         return params;
     }
 
+    private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, 
String cluster, boolean isCloud) {
+        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
+        groupCommitParams.setGetGroupCommitLoadBeId(true);
+        groupCommitParams.setGroupCommitLoadTableId(tableId);
+        groupCommitParams.setCluster(cluster);
+        groupCommitParams.setIsCloud(isCloud);
+
+        final TMasterOpRequest params = new TMasterOpRequest();
+        // node ident
+        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        params.setGroupCommitInfo(groupCommitParams);
+        params.setDb(ctx.getDatabase());
+        params.setUser(ctx.getQualifiedUser());
+        // just make the protocol happy
+        params.setSql("");
+        return params;
+    }
+
+    private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long 
receiveData) {
+        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
+        groupCommitParams.setUpdateLoadData(true);
+        groupCommitParams.setTableId(tableId);
+        groupCommitParams.setReceiveData(receiveData);
+
+        final TMasterOpRequest params = new TMasterOpRequest();
+        // node ident
+        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        params.setGroupCommitInfo(groupCommitParams);
+        params.setDb(ctx.getDatabase());
+        params.setUser(ctx.getQualifiedUser());
+        // just make the protocol happy
+        params.setSql("");
+        return params;
+    }
+
     public ByteBuffer getOutputPacket() {
         if (result == null) {
             return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 130d440fcda..548e9302cb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.PatternMatcher;
@@ -172,6 +173,7 @@ import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
 import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
 import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
+import org.apache.doris.thrift.TGroupCommitInfo;
 import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
 import org.apache.doris.thrift.TInitExternalCtlMetaResult;
 import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
@@ -1032,6 +1034,28 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setPacket("".getBytes());
             return result;
         }
+        if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
+            final TGroupCommitInfo info = params.getGroupCommitInfo();
+            final TMasterOpResult result = new TMasterOpResult();
+            try {
+                
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
+                        
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster, 
info.isCloud));
+            } catch (LoadException | DdlException e) {
+                throw new TException(e.getMessage());
+            }
+            // just make the protocol happy
+            result.setPacket("".getBytes());
+            return result;
+        }
+        if (params.getGroupCommitInfo().isUpdateLoadData()) {
+            final TGroupCommitInfo info = params.getGroupCommitInfo();
+            final TMasterOpResult result = new TMasterOpResult();
+            Env.getCurrentEnv().getGroupCommitManager()
+                    .updateLoadData(info.tableId, info.receiveData);
+            // just make the protocol happy
+            result.setPacket("".getBytes());
+            return result;
+        }
         if (params.isSetCancelQeury() && params.isCancelQeury()) {
             if (!params.isSetQueryId()) {
                 throw new TException("a query id is needed to cancel a query");
@@ -1633,6 +1657,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         request.getTbl(), request.getUserIp(), 
PrivPredicate.LOAD);
             }
         }
+        if (request.groupCommit) {
+            try {
+                
Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, 
request.receiveBytes);
+            } catch (Exception e) {
+                LOG.warn("Failed to update group commit load data, {}", 
e.getMessage());
+            }
+        }
 
         // get database
         Env env = Env.getCurrentEnv();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index ecade162304..d48bd756436 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -540,6 +540,16 @@ struct TTxnLoadInfo {
     6: optional list<TSubTxnInfo> subTxnInfos
 }
 
+struct TGroupCommitInfo{
+    1: optional bool getGroupCommitLoadBeId
+    2: optional i64 groupCommitLoadTableId
+    3: optional string cluster
+    4: optional bool isCloud
+    5: optional bool updateLoadData
+    6: optional i64 tableId 
+    7: optional i64 receiveData
+}
+
 struct TMasterOpRequest {
     1: required string user
     2: required string db
@@ -573,6 +583,7 @@ struct TMasterOpRequest {
     28: optional map<string, Exprs.TExprNode> user_variables
     // transaction load
     29: optional TTxnLoadInfo txnLoadInfo
+    30: optional TGroupCommitInfo groupCommitInfo
 
     // selectdb cloud
     1000: optional string cloud_cluster
@@ -606,6 +617,7 @@ struct TMasterOpResult {
     8: optional list<binary> queryResultBufList;
     // transaction load
     9: optional TTxnLoadInfo txnLoadInfo;
+    10: optional i64 groupCommitLoadBeId;
 }
 
 struct TUpdateExportTaskStatusRequest {
@@ -817,6 +829,9 @@ struct TLoadTxnCommitRequest {
     15: optional list<string> tbls
     16: optional i64 table_id
     17: optional string auth_code_uuid
+    18: optional bool groupCommit
+    19: optional i64 receiveBytes
+    20: optional i64 backendId 
 }
 
 struct TLoadTxnCommitResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to