This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6bec20afd6c branch-3.1: [enhance](transaction) add completeness of
commit tablet info check in cloud mode #53979 (#54305)
6bec20afd6c is described below
commit 6bec20afd6cf6c2b176dcc84b23454c03e28ec3a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Aug 5 20:18:12 2025 +0800
branch-3.1: [enhance](transaction) add completeness of commit tablet info
check in cloud mode #53979 (#54305)
Cherry-picked from #53979
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 10 ++++
.../transaction/CloudGlobalTransactionMgr.java | 70 ++++++++++++++++++++++
.../test_incomplete_commit_info.groovy | 63 +++++++++++++++++++
3 files changed, 143 insertions(+)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f6d8b844b18..fdb75a31d87 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1014,7 +1014,17 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
if (!st.ok()) {
_cancel_with_msg(st.to_string());
} else if (ctx._is_last_rpc) {
+ bool skip_tablet_info = false;
+
DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info",
+ { skip_tablet_info = true; });
for (const auto& tablet : result.tablet_vec()) {
+
DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info",
{
+ if (skip_tablet_info) {
+ LOG(INFO) << "skip tablet info: " <<
tablet.tablet_id();
+ skip_tablet_info = false;
+ continue;
+ }
+ });
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
commit_info.backendId = _node_id;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index b7443dbaab4..2abbc072b9f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -72,6 +72,7 @@ import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@@ -687,6 +688,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC)
throws UserException {
+ checkCommitInfo(commitTxnRequest);
+
CommitTxnResponse commitTxnResponse = null;
TransactionState txnState = null;
int retryTime = 0;
@@ -748,6 +751,73 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
return txnState;
}
+ private void checkCommitInfo(CommitTxnRequest commitTxnRequest) throws
UserException {
+ List<Long> commitTabletIds = Lists.newArrayList();
+ List<Long> commitIndexIds = Lists.newArrayList();
+ commitTabletIds.addAll(commitTxnRequest.getBaseTabletIdsList());
+ for (SubTxnInfo subTxnInfo : commitTxnRequest.getSubTxnInfosList()) {
+ commitTabletIds.addAll(subTxnInfo.getBaseTabletIdsList());
+ }
+ if (commitTabletIds.isEmpty()) {
+ return;
+ }
+
+ TabletInvertedIndex tabletInvertedIndex =
Env.getCurrentEnv().getTabletInvertedIndex();
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(commitTabletIds);
+ Map<OlapTable, Set<Long>> tableToPartition = Maps.newHashMap();
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ if (tabletMeta == null) {
+ continue;
+ }
+ long tableId = tabletMeta.getTableId();
+ long dbId = tabletMeta.getDbId();
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ // this can happen when dbId == -1 (tablet being dropping) or
db really not exist.
+ continue;
+ }
+ OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
+ if (tbl == null) {
+ // this can happen when tableId == -1 (tablet being dropping)
or table really not exist.
+ continue;
+ }
+ // check relative partition restore here
+ long partitionId = tabletMeta.getPartitionId();
+ if (tbl.getPartition(partitionId) == null) {
+ // this can happen when partitionId == -1 (tablet being
dropping) or partition really not exist.
+ continue;
+ }
+ tableToPartition.computeIfAbsent(tbl, k ->
Sets.newHashSet()).add(partitionId);
+ commitIndexIds.add(tabletMeta.getIndexId());
+ }
+
+ for (OlapTable tbl : tableToPartition.keySet()) {
+ for (Partition partition : tbl.getAllPartitions()) {
+ if (!tableToPartition.get(tbl).contains(partition.getId())) {
+ continue;
+ }
+ List<MaterializedIndex> allIndices
+ =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+ for (MaterializedIndex index : allIndices) {
+ // Schema change during load will increase partition index
number,
+ // and we need to skip these indexes.
+ // TODO: judge by transactionState.getLoadedTblIndexes()
is better
+ if (!commitIndexIds.contains(index.getId())) {
+ continue;
+ }
+ for (Tablet tablet : index.getTablets()) {
+ if (!commitTabletIds.contains(tablet.getId())) {
+ throw new LoadException("Table [" + tbl.getName()
+ "], Index ["
+ + index.getId() + "], Partition [" +
partition.getName()
+ + "], tablet " + tablet.getId() + " should
be committed");
+ }
+ }
+ }
+ }
+ }
+ }
+
// return mow tables with contains tablet commit info
private List<OlapTable> getMowTableList(List<Table> tableList,
List<TabletCommitInfo> tabletCommitInfos) {
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
diff --git
a/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy
b/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy
new file mode 100644
index 00000000000..2b6503a929e
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_incomplete_commit_info.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_incomplete_commit_info", "nonConcurrent") {
+ try {
+ def tableName = "test_incomplete_commit_info"
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.add_block_success_callback.incomplete_commit_info")
+ streamLoad {
+ table "${tableName}"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ }
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.add_block_success_callback.incomplete_commit_info")
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]