This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 792bd7c74a1 [cherry-pick](branch-2.1) Pick "[Enhancement](group 
commit)Optimize be select for group commit #35558" (#37830)
792bd7c74a1 is described below

commit 792bd7c74a1b15b057355b55aa30f1a44ccbc1d1
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed Jul 24 09:21:07 2024 +0800

    [cherry-pick](branch-2.1) Pick "[Enhancement](group commit)Optimize be 
select for group commit #35558" (#37830)
    
    Pick #35558
---
 be/src/runtime/group_commit_mgr.cpp                |   7 +
 .../doris/common/util/SlidingWindowCounter.java    |  73 ++++++++++
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  44 ++++--
 .../org/apache/doris/load/GroupCommitManager.java  | 160 +++++++++++++++++++++
 .../apache/doris/planner/GroupCommitPlanner.java   |  10 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  25 +++-
 .../java/org/apache/doris/qe/MasterOpExecutor.java |  47 ++++++
 .../apache/doris/service/FrontendServiceImpl.java  |  31 ++++
 gensrc/thrift/FrontendService.thrift               |  40 ++++++
 9 files changed, 421 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index fd9b96b0c1f..6bbbe88d028 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -397,6 +397,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         request.__set_db_id(db_id);
         request.__set_table_id(table_id);
         request.__set_txnId(txn_id);
+        request.__set_groupCommit(true);
+        request.__set_receiveBytes(state->num_bytes_load_total());
+        if (_exec_env->master_info()->__isset.backend_id) {
+            request.__set_backendId(_exec_env->master_info()->backend_id);
+        } else {
+            LOG(WARNING) << "_exec_env->master_info not set backend_id";
+        }
         if (state) {
             request.__set_commitInfos(state->tablet_commit_infos());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
new file mode 100644
index 00000000000..787fbb06a2f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+
+public class SlidingWindowCounter {
+    private final int windowSizeInSeconds;
+    private final int numberOfBuckets;
+    private final AtomicLongArray buckets;
+    private final AtomicLongArray bucketTimestamps;
+
+    public SlidingWindowCounter(int windowSizeInSeconds) {
+        this.windowSizeInSeconds = windowSizeInSeconds;
+        this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 
1 second
+        this.buckets = new AtomicLongArray(numberOfBuckets);
+        this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
+    }
+
+    private int getCurrentBucketIndex() {
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        return (int) (currentTime % numberOfBuckets);
+    }
+
+    private void updateCurrentBucket() {
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        int currentBucketIndex = getCurrentBucketIndex();
+        long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);
+
+        if (currentTime - bucketTimestamp >= 1) {
+            buckets.set(currentBucketIndex, 0);
+            bucketTimestamps.set(currentBucketIndex, currentTime);
+        }
+    }
+
+    public void add(long value) {
+        updateCurrentBucket();
+        int bucketIndex = getCurrentBucketIndex();
+        buckets.addAndGet(bucketIndex, value);
+    }
+
+    public long get() {
+        updateCurrentBucket();
+        long currentTime = System.currentTimeMillis() / 1000; // Current time 
in seconds
+        long count = 0;
+
+        for (int i = 0; i < numberOfBuckets; i++) {
+            if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
+                count += buckets.get(i);
+            }
+        }
+        return count;
+    }
+
+    public String toString() {
+        return String.valueOf(get());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 259062a828b..edcbc2dd7f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest;
 
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -128,11 +129,16 @@ public class LoadAction extends RestBaseController {
         String sql = request.getHeader("sql");
         LOG.info("streaming load sql={}", sql);
         boolean groupCommit = false;
+        long tableId = -1;
         String groupCommitStr = request.getHeader("group_commit");
         if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
             groupCommit = true;
             try {
                 String[] pair = parseDbAndTb(sql);
+                Database db = Env.getCurrentInternalCatalog()
+                        .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
+                Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
+                tableId = tbl.getId();
                 if (isGroupCommitBlock(pair[0], pair[1])) {
                     String msg = "insert table " + pair[1] + " is blocked on 
schema change";
                     return new RestBaseResult(msg);
@@ -150,8 +156,7 @@ public class LoadAction extends RestBaseController {
             }
 
             String label = request.getHeader(LABEL_KEY);
-            TNetworkAddress redirectAddr;
-            redirectAddr = selectRedirectBackend(groupCommit);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
groupCommit, tableId);
 
             LOG.info("redirect load action to destination={}, label: {}",
                     redirectAddr.toString(), label);
@@ -274,7 +279,9 @@ public class LoadAction extends RestBaseController {
                     return new RestBaseResult(e.getMessage());
                 }
             } else {
-                redirectAddr = selectRedirectBackend(groupCommit);
+                long tableId = ((OlapTable) ((Database) 
Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
+                        .get()).getTable(tableName).get()).getId();
+                redirectAddr = selectRedirectBackend(request, groupCommit, 
tableId);
             }
 
             LOG.info("redirect load action to destination={}, stream: {}, db: 
{}, tbl: {}, label: {}",
@@ -305,7 +312,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No transaction operation(\'commit\' 
or \'abort\') selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false, -1);
             LOG.info("redirect stream load 2PC action to destination={}, db: 
{}, txn: {}, operation: {}",
                     redirectAddr.toString(), dbName, 
request.getHeader(TXN_ID_KEY), txnOperation);
 
@@ -323,12 +330,18 @@ public class LoadAction extends RestBaseController {
         return index;
     }
 
-    private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws 
LoadException {
+    private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit, long tableId)
+            throws LoadException {
         long debugBackendId = 
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
 -1L);
         if (debugBackendId != -1L) {
             Backend backend = 
Env.getCurrentSystemInfo().getBackend(debugBackendId);
             return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
         }
+        return selectLocalRedirectBackend(groupCommit, request, tableId);
+    }
+
+    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, 
HttpServletRequest request, long tableId)
+            throws LoadException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -348,12 +361,17 @@ public class LoadAction extends RestBaseController {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
         if (groupCommit) {
-            for (Long backendId : backendIds) {
-                Backend candidateBe = 
Env.getCurrentSystemInfo().getBackend(backendId);
-                if (!candidateBe.isDecommissioned()) {
-                    backend = candidateBe;
-                    break;
-                }
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            ctx.setRemoteIP(request.getRemoteAddr());
+            ctx.setThreadLocalInfo();
+
+            try {
+                backend = Env.getCurrentEnv().getGroupCommitManager()
+                        .selectBackendForGroupCommit(tableId, ctx, false);
+            } catch (DdlException e) {
+                throw new RuntimeException(e);
             }
         } else {
             backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@@ -416,10 +434,10 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No label selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false, -1);
 
             LOG.info("Redirect load action with auth token to destination={},"
-                        + "stream: {}, db: {}, tbl: {}, label: {}",
+                            + "stream: {}, db: {}, tbl: {}, label: {}",
                     redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
 
             URI urlObj = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index c4bf1e03c9c..1ec6a06179e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -18,9 +18,16 @@
 package org.apache.doris.load;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.util.SlidingWindowCounter;
+import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.MasterOpExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -28,11 +35,17 @@ import org.apache.doris.thrift.TStatusCode;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 public class GroupCommitManager {
 
@@ -40,6 +53,11 @@ public class GroupCommitManager {
 
     private Set<Long> blockedTableIds = new HashSet<>();
 
+    // Table id to BE id map. Only for group commit.
+    private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
+    // BE id to pressure map. Only for group commit.
+    private Map<Long, SlidingWindowCounter> tablePressureMap = new 
ConcurrentHashMap<>();
+
     public boolean isBlock(long tableId) {
         return blockedTableIds.contains(tableId);
     }
@@ -163,4 +181,146 @@ public class GroupCommitManager {
         return size;
     }
 
+    public Backend selectBackendForGroupCommit(long tableId, ConnectContext 
context, boolean isCloud)
+            throws LoadException, DdlException {
+        // If a group commit request is sent to the follower FE, we will send 
this request to the master FE. master FE
+        // can select a BE and return this BE id to follower FE.
+        if (!Env.getCurrentEnv().isMaster()) {
+            try {
+                long backendId = new MasterOpExecutor(context)
+                        .getGroupCommitLoadBeId(tableId);
+                return Env.getCurrentSystemInfo().getBackend(backendId);
+            } catch (Exception e) {
+                throw new LoadException(e.getMessage());
+            }
+        } else {
+            // Master FE will select BE by itself.
+            return Env.getCurrentSystemInfo()
+                    .getBackend(selectBackendForGroupCommitInternal(tableId));
+        }
+    }
+
+    public long selectBackendForGroupCommitInternal(long tableId)
+            throws LoadException, DdlException {
+        // Understanding Group Commit and Backend Selection Logic
+        //
+        // Group commit is a server-side technique used for batching data 
imports.
+        // The primary purpose of group commit is to enhance import 
performance by
+        // reducing the number of versions created for high-frequency, 
small-batch imports.
+        // Without batching, each import operation creates a separate version, 
similar to a rowset in an LSM Tree,
+        // which can consume significant compaction resources and degrade 
system performance.
+        // By batching data, fewer versions are generated from the same amount 
of data,
+        // thus minimizing compaction and improving performance. For detailed 
usage,
+        // you can refer to the Group Commit Manual
+        // 
(https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/)
 .
+        //
+        // The specific backend (BE) selection logic for group commits aims to
+        // direct data belonging to the same table to the same BE for batching.
+        // This is because group commit batches data imported to the same table
+        // on the same BE into a single version, which is then flushed 
periodically.
+        // For example, if data for the same table is distributed across three 
BEs,
+        // it will result in three versions.
+        // Conversely, if data for four different tables is directed to the 
same BE,
+        // it will create four versions. However,
+        // directing all data for the same table to a single BE will only 
produce one version.
+        //
+        // To optimize performance and avoid overloading a single BE, the 
strategy for selecting a BE works as follows:
+        //
+        // If a BE is already handling imports for table A and is not under 
significant load,
+        // the data is sent to this BE.
+        // If the BE is overloaded or if there is no existing record of a BE 
handling imports for table A,
+        // a BE is chosen at random. This BE is then recorded along with the 
mapping of table A and its load level.
+        // This approach ensures that group commits can effectively batch data 
together
+        // while managing the load on each BE efficiently.
+        return selectBackendForLocalGroupCommitInternal(tableId);
+    }
+
+    private long selectBackendForLocalGroupCommitInternal(long tableId) throws 
LoadException {
+        LOG.debug("group commit select be info, tableToBeMap {}, 
tablePressureMap {}", tableToBeMap.toString(),
+                tablePressureMap.toString());
+        Long cachedBackendId = getCachedBackend(tableId);
+        if (cachedBackendId != null) {
+            return cachedBackendId;
+        }
+
+        List<Backend> backends = new 
ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends());
+        if (backends.isEmpty()) {
+            throw new LoadException("No alive backend");
+        }
+
+        // If the cached backend is not active or decommissioned, select a 
random new backend.
+        Long randomBackendId = getRandomBackend(tableId, backends);
+        if (randomBackendId != null) {
+            return randomBackendId;
+        }
+        List<String> backendsInfo = backends.stream()
+                .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive()
+                        + ", decommission=" + be.isDecommissioned() + " }")
+                .collect(Collectors.toList());
+        throw new LoadException("No suitable backend " + ", backends = " + 
backendsInfo);
+    }
+
+    @Nullable
+    private Long getCachedBackend(long tableId) {
+        OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
+        if (tableToBeMap.containsKey(tableId)) {
+            if (tablePressureMap.get(tableId).get() < 
table.getGroupCommitDataBytes()) {
+                Backend backend = 
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+                if (backend.isAlive() && !backend.isDecommissioned()) {
+                    return backend.getId();
+                } else {
+                    tableToBeMap.remove(tableId);
+                }
+            } else {
+                tableToBeMap.remove(tableId);
+            }
+        }
+        return null;
+    }
+
+    @Nullable
+    private Long getRandomBackend(long tableId, List<Backend> backends) {
+        OlapTable table = (OlapTable) 
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
+        Collections.shuffle(backends);
+        for (Backend backend : backends) {
+            if (backend.isAlive() && !backend.isDecommissioned()) {
+                tableToBeMap.put(tableId, backend.getId());
+                tablePressureMap.put(tableId,
+                        new 
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
+                return backend.getId();
+            }
+        }
+        return null;
+    }
+
+    public void updateLoadData(long tableId, long receiveData) {
+        if (tableId == -1) {
+            LOG.warn("invalid table id: " + tableId);
+        }
+        if (!Env.getCurrentEnv().isMaster()) {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            // set user to ADMIN_USER, so that we can get the proper resource 
tag
+            ctx.setQualifiedUser(Auth.ADMIN_USER);
+            ctx.setThreadLocalInfo();
+            try {
+                new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            updateLoadDataInternal(tableId, receiveData);
+        }
+    }
+
+    public void updateLoadDataInternal(long tableId, long receiveData) {
+        if (tablePressureMap.containsKey(tableId)) {
+            tablePressureMap.get(tableId).add(receiveData);
+            LOG.info("Update load data for table{}, receiveData {}, 
tablePressureMap {}", tableId, receiveData,
+                    tablePressureMap.toString());
+        } else {
+            LOG.warn("can not find backend id: {}", tableId);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 6e235443bc3..0b051aeb888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FormatOptions;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
@@ -202,6 +203,15 @@ public class GroupCommitPlanner {
         }
     }
 
+    protected void selectBackends(ConnectContext ctx) throws DdlException {
+        try {
+            backend = Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(this.table.getId(), ctx, 
false);
+        } catch (LoadException e) {
+            throw new DdlException("No suitable backend");
+        }
+    }
+
     public Backend getBackend() {
         return backend;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f0c0b799f6..2904cfdb5e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -273,6 +273,8 @@ public class Coordinator implements CoordInterface {
     private boolean enablePipelineXEngine = false;
     private boolean useNereids = false;
 
+    private Backend groupCommitBackend;
+
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -298,6 +300,10 @@ public class Coordinator implements CoordInterface {
     // fragmentid -> backendid
     private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
 
+    public void setGroupCommitBe(Backend backend) {
+        this.groupCommitBackend = backend;
+    }
+
     public void setTWorkloadGroups(List<TPipelineWorkloadGroup> 
tWorkloadGroups) {
         this.tWorkloadGroups = tWorkloadGroups;
     }
@@ -1955,8 +1961,11 @@ public class Coordinator implements CoordInterface {
             if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
                 Reference<Long> backendIdRef = new Reference<Long>();
                 TNetworkAddress execHostport;
-                if (((ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan
-                        && Config.prefer_compute_node_for_external_table)) && 
!addressToBackendID.isEmpty()) {
+                if (groupCommitBackend != null) {
+                    execHostport = getGroupCommitBackend(addressToBackendID);
+                } else if (((ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()) || (
+                        isAllExternalScan
+                                && 
Config.prefer_compute_node_for_external_table)) && 
!addressToBackendID.isEmpty()) {
                     // 2 cases:
                     // case 1: user set resource tag, we need to use the BE 
with the specified resource tags.
                     // case 2: All scan nodes are external scan node,
@@ -2148,7 +2157,9 @@ public class Coordinator implements CoordInterface {
             if (params.instanceExecParams.isEmpty()) {
                 Reference<Long> backendIdRef = new Reference<Long>();
                 TNetworkAddress execHostport;
-                if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
+                if (groupCommitBackend != null) {
+                    execHostport = getGroupCommitBackend(addressToBackendID);
+                } else if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
                         && !addressToBackendID.isEmpty()) {
                     // In this case, we only use the BE where the replica 
selected by the tag is located to
                     // execute this query. Otherwise, except for the scan 
node, the rest of the execution nodes
@@ -2172,6 +2183,14 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    private TNetworkAddress getGroupCommitBackend(Map<TNetworkAddress, Long> 
addressToBackendID) {
+        // Used for Nereids planner Group commit insert BE select.
+        TNetworkAddress execHostport = new 
TNetworkAddress(groupCommitBackend.getHost(),
+                groupCommitBackend.getBePort());
+        addressToBackendID.put(execHostport, groupCommitBackend.getId());
+        return execHostport;
+    }
+
     // Traverse the expected runtimeFilterID in each fragment, and establish 
the corresponding relationship
     // between runtimeFilterID and fragment instance addr and select the merge 
instance of runtimeFilter
     private void assignRuntimeFilterAddr() throws Exception {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 934c221905f..9ff3acecdd9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TGroupCommitInfo;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -88,6 +89,17 @@ public class MasterOpExecutor {
         waitOnReplaying();
     }
 
+    public long getGroupCommitLoadBeId(long tableId) throws Exception {
+        result = forward(buildGetGroupCommitLoadBeIdParmas(tableId));
+        waitOnReplaying();
+        return result.groupCommitLoadBeId;
+    }
+
+    public void updateLoadData(long tableId, long receiveData) throws 
Exception {
+        result = forward(buildUpdateLoadDataParams(tableId, receiveData));
+        waitOnReplaying();
+    }
+
     private void waitOnReplaying() throws DdlException {
         LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
         ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
waitTimeoutMs);
@@ -187,6 +199,41 @@ public class MasterOpExecutor {
         return params;
     }
 
+    private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) {
+        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
+        groupCommitParams.setGetGroupCommitLoadBeId(true);
+        groupCommitParams.setGroupCommitLoadTableId(tableId);
+
+        final TMasterOpRequest params = new TMasterOpRequest();
+        // node ident
+        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        params.setGroupCommitInfo(groupCommitParams);
+        params.setDb(ctx.getDatabase());
+        params.setUser(ctx.getQualifiedUser());
+        // just make the protocol happy
+        params.setSql("");
+        return params;
+    }
+
+    private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long 
receiveData) {
+        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
+        groupCommitParams.setUpdateLoadData(true);
+        groupCommitParams.setTableId(tableId);
+        groupCommitParams.setReceiveData(receiveData);
+
+        final TMasterOpRequest params = new TMasterOpRequest();
+        // node ident
+        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        params.setGroupCommitInfo(groupCommitParams);
+        params.setDb(ctx.getDatabase());
+        params.setUser(ctx.getQualifiedUser());
+        // just make the protocol happy
+        params.setSql("");
+        return params;
+    }
+
     public ByteBuffer getOutputPacket() {
         if (result == null) {
             return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index bf2adf5591b..448b1231dde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -53,6 +53,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.PatternMatcher;
@@ -170,6 +171,7 @@ import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
 import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
 import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
+import org.apache.doris.thrift.TGroupCommitInfo;
 import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
 import org.apache.doris.thrift.TInitExternalCtlMetaResult;
 import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
@@ -1008,6 +1010,28 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setPacket("".getBytes());
             return result;
         }
+        if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
+            final TGroupCommitInfo info = params.getGroupCommitInfo();
+            final TMasterOpResult result = new TMasterOpResult();
+            try {
+                
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
+                        
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId));
+            } catch (LoadException | DdlException e) {
+                throw new TException(e.getMessage());
+            }
+            // just make the protocol happy
+            result.setPacket("".getBytes());
+            return result;
+        }
+        if (params.getGroupCommitInfo().isUpdateLoadData()) {
+            final TGroupCommitInfo info = params.getGroupCommitInfo();
+            final TMasterOpResult result = new TMasterOpResult();
+            Env.getCurrentEnv().getGroupCommitManager()
+                    .updateLoadData(info.tableId, info.receiveData);
+            // just make the protocol happy
+            result.setPacket("".getBytes());
+            return result;
+        }
 
         // add this log so that we can track this stmt
         if (LOG.isDebugEnabled()) {
@@ -1567,6 +1591,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         request.getTbl(), request.getUserIp(), 
PrivPredicate.LOAD);
             }
         }
+        if (request.groupCommit) {
+            try {
+                
Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, 
request.receiveBytes);
+            } catch (Exception e) {
+                LOG.warn("Failed to update group commit load data, {}", 
e.getMessage());
+            }
+        }
 
         // get database
         Env env = Env.getCurrentEnv();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index f1bf6eadd60..d0b45d72647 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -492,6 +492,36 @@ struct TFeResult {
     1: required FrontendServiceVersion protocolVersion
     2: required Status.TStatus status
 }
+
+enum TSubTxnType {
+    INSERT = 0,
+    DELETE = 1
+}
+
+struct TSubTxnInfo {
+    1: optional i64 sub_txn_id
+    2: optional i64 table_id
+    3: optional list<Types.TTabletCommitInfo> tablet_commit_infos
+    4: optional TSubTxnType sub_txn_type
+}
+
+struct TTxnLoadInfo {
+    1: optional string label
+    2: optional i64 dbId
+    3: optional i64 txnId
+    4: optional i64 timeoutTimestamp
+    5: optional i64 allSubTxnNum
+    6: optional list<TSubTxnInfo> subTxnInfos
+}
+
+struct TGroupCommitInfo{
+    1: optional bool getGroupCommitLoadBeId
+    2: optional i64 groupCommitLoadTableId
+    3: optional bool updateLoadData
+    4: optional i64 tableId 
+    5: optional i64 receiveData
+}
+
 struct TMasterOpRequest {
     1: required string user
     2: required string db
@@ -523,6 +553,9 @@ struct TMasterOpRequest {
     26: optional string defaultDatabase
     27: optional bool cancel_qeury // if set to true, this request means to 
cancel one forwarded query, and query_id needs to be set
     28: optional map<string, Exprs.TExprNode> user_variables
+    // transaction load
+    29: optional TTxnLoadInfo txnLoadInfo
+    30: optional TGroupCommitInfo groupCommitInfo
 }
 
 struct TColumnDefinition {
@@ -550,6 +583,9 @@ struct TMasterOpResult {
     6: optional i32 statusCode;
     7: optional string errMessage;
     8: optional list<binary> queryResultBufList;
+    // transaction load
+    9: optional TTxnLoadInfo txnLoadInfo;
+    10: optional i64 groupCommitLoadBeId;
 }
 
 struct TUpdateExportTaskStatusRequest {
@@ -750,6 +786,10 @@ struct TLoadTxnCommitRequest {
     14: optional i64 db_id
     15: optional list<string> tbls
     16: optional i64 table_id
+    17: optional string auth_code_uuid
+    18: optional bool groupCommit
+    19: optional i64 receiveBytes
+    20: optional i64 backendId 
 }
 
 struct TLoadTxnCommitResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to