This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 65661ffff67 [opt](group-commit) Skip createLocation in group commit 
stream load sink (#63561)
65661ffff67 is described below

commit 65661ffff6757d9e5254cfff8ebfb049542a0fc2
Author: Xin Liao <[email protected]>
AuthorDate: Tue May 26 18:11:33 2026 +0800

    [opt](group-commit) Skip createLocation in group commit stream load sink 
(#63561)
    
    ## Summary
    
    The BE-side `GroupCommitBlockSinkOperatorX::init` does **not** consume
    `TOlapTableSink.location` or `slave_location` (it only reads `tuple_id`
    / `schema` / `db_id` / `table_id` / `partition` / `group_commit_mode` /
    `load_id` / `max_filter_ratio`). However, FE still runs
    `createLocation`, which iterates `O(partitions * indexes * tablets *
    replicas)` and, for every replica, takes the `CloudSystemInfoService` RW
    read lock via `CloudReplica.getCurrentClusterId`.
    
    Under high-concurrency group commit stream load on wide-partition tables
    (3000+ partitions in a real production incident), CAS contention on the
    RW lock's `state` cache line saturated all FE CPUs, and the cluster
    could not recover even after scaling out (more cores = more CAS
    contenders = worse contention).
    
    ## Change
    
    - Introduce a `protected initLocationParams(TOlapTableSink)` hook on
    `OlapTableSink`. Default behavior delegates to `createLocation`, so
    non-group-commit sinks are unaffected.
    - Route both `init(...)` overloads in `OlapTableSink` through the hook.
    - `GroupCommitBlockSink` overrides the hook to return empty placeholder
    `TOlapTableLocationParam` objects. `TOlapTableSink.location` is a
    required thrift field, so we still set non-null placeholders, but no
    tablet/replica enumeration happens.
    
    Effect on the group-commit path:
    - Per-request FE CPU: `O(partitions * indexes * tablets * replicas)` →
    `O(1)`
    - `CloudSystemInfoService` RW lock acquisitions: hundreds of concurrent
    CAS spinners → 0
---
 .../apache/doris/planner/GroupCommitBlockSink.java | 21 +++++++
 .../org/apache/doris/planner/OlapTableSink.java    | 13 +++-
 .../doris/planner/GroupCommitBlockSinkTest.java    | 69 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
index a45b65ca19a..0b1ef04496f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java
@@ -20,12 +20,18 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TGroupCommitMode;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TOlapTableSink;
+import org.apache.doris.thrift.TTabletLocation;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -63,6 +69,21 @@ public class GroupCommitBlockSink extends OlapTableSink {
         return tDataSink;
     }
 
+    // BE-side GroupCommitBlockSinkOperatorX::init does not consume 
location/slave_location
+    // (it only reads 
tuple_id/schema/db_id/table_id/partition/group_commit_mode/load_id/
+    // max_filter_ratio). Skip the per-tablet replica enumeration in 
createLocation, which
+    // is the dominant FE CPU cost under high-concurrency group-commit stream 
load.
+    // We still return placeholder TOlapTableLocationParam objects because
+    // TOlapTableSink.location is a required thrift field.
+    @Override
+    protected List<TOlapTableLocationParam> initLocationParams(TOlapTableSink 
tSink) throws UserException {
+        TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+        TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
+        locationParam.setTablets(Lists.<TTabletLocation>newArrayList());
+        slaveLocationParam.setTablets(Lists.<TTabletLocation>newArrayList());
+        return Arrays.asList(locationParam, slaveLocationParam);
+    }
+
     public static TGroupCommitMode parseGroupCommit(String groupCommit) {
         if (groupCommit == null) {
             return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 4d95fababc1..e3a9954b627 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -236,7 +236,7 @@ public class OlapTableSink extends DataSink {
                 partition.setTabletVersionGapBackends(gapBackends);
             }
         }
-        tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
+        tOlapTableLocationParams = initLocationParams(tSink);
 
         tSink.setTableId(dstTable.getId());
         tSink.setTupleId(tupleDescriptor.getId().asInt());
@@ -294,7 +294,7 @@ public class OlapTableSink extends DataSink {
                 partition.setTabletVersionGapBackends(gapBackends);
             }
         }
-        tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
+        tOlapTableLocationParams = initLocationParams(tSink);
 
         tSink.setTableId(dstTable.getId());
         tSink.setTupleId(tupleDescriptor.getId().asInt());
@@ -737,6 +737,15 @@ public class OlapTableSink extends DataSink {
         }
     }
 
+    // Hook for subclasses to control how the tablet location params are 
populated.
+    // Default behavior computes the full tablet -> backend mapping via 
createLocation,
+    // which under high-concurrency stream load on large tables is the 
dominant FE CPU
+    // cost. Subclasses whose BE counterpart does not consume 
TOlapTableSink.location
+    // (e.g. GroupCommitBlockSink) can override this hook to skip that work.
+    protected List<TOlapTableLocationParam> initLocationParams(TOlapTableSink 
tSink) throws UserException {
+        return createLocation(tSink.getDbId(), dstTable);
+    }
+
     public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) 
throws UserException {
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
new file mode 100644
index 00000000000..281afd9b59e
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.thrift.TGroupCommitMode;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TOlapTableSink;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+public class GroupCommitBlockSinkTest {
+
+    @Test
+    public void testInitLocationParamsSkipsCreateLocation() throws 
UserException {
+        OlapTable dstTable = Mockito.mock(OlapTable.class);
+        TupleDescriptor tuple = Mockito.mock(TupleDescriptor.class);
+        GroupCommitBlockSink sink = new GroupCommitBlockSink(
+                dstTable, tuple, Lists.newArrayList(1L), false, "async_mode", 
0.0);
+
+        List<TOlapTableLocationParam> params = sink.initLocationParams(new 
TOlapTableSink());
+
+        Assert.assertEquals(2, params.size());
+        Assert.assertNotNull(params.get(0).getTablets());
+        Assert.assertTrue("master location should be empty placeholder",
+                params.get(0).getTablets().isEmpty());
+        Assert.assertNotNull(params.get(1).getTablets());
+        Assert.assertTrue("slave location should be empty placeholder",
+                params.get(1).getTablets().isEmpty());
+        Mockito.verifyNoInteractions(dstTable);
+        Mockito.verifyNoInteractions(tuple);
+    }
+
+    @Test
+    public void testParseGroupCommit() {
+        Assert.assertEquals(TGroupCommitMode.ASYNC_MODE,
+                GroupCommitBlockSink.parseGroupCommit("async_mode"));
+        Assert.assertEquals(TGroupCommitMode.ASYNC_MODE,
+                GroupCommitBlockSink.parseGroupCommit("ASYNC_MODE"));
+        Assert.assertEquals(TGroupCommitMode.SYNC_MODE,
+                GroupCommitBlockSink.parseGroupCommit("sync_mode"));
+        Assert.assertEquals(TGroupCommitMode.OFF_MODE,
+                GroupCommitBlockSink.parseGroupCommit("off_mode"));
+        Assert.assertNull(GroupCommitBlockSink.parseGroupCommit(null));
+        Assert.assertNull(GroupCommitBlockSink.parseGroupCommit("invalid"));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to