This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new dcf5f784d8 [fix](catalog) fix bug that replica missing version cause query -214 error (#9266) dcf5f784d8 is described below commit dcf5f784d8681a3aa0665fd8200d761a08015cd4 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue May 3 17:54:19 2022 +0800 [fix](catalog) fix bug that replica missing version cause query -214 error (#9266) 1. Fix bug described in #9267 When report missing version replica, set last failed version to (replica version + 1) 2. Skip non-exist partition when handling transactions. --- .../java/org/apache/doris/catalog/Catalog.java | 5 +- .../java/org/apache/doris/catalog/Replica.java | 2 +- .../org/apache/doris/master/ReportHandler.java | 20 +++++--- .../apache/doris/persist/BackendReplicasInfo.java | 18 +++++-- .../doris/transaction/DatabaseTransactionMgr.java | 60 +++++++++------------- .../doris/transaction/GlobalTransactionMgr.java | 4 +- .../doris/transaction/PublishVersionDaemon.java | 6 +-- .../doris/persist/BackendReplicaInfosTest.java | 3 +- .../transaction/GlobalTransactionMgrTest.java | 9 ++-- 9 files changed, 61 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d0cab71cad..47bbe538e8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -6975,10 +6975,7 @@ public class Catalog { replica.setBad(true); break; case MISSING_VERSION: - // The absolute value is meaningless, as long as it is greater than 0. - // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, - // it will be considered a version missing replica and will be handled accordingly. - replica.setLastFailedVersion(1L); + replica.updateLastFailedVersion(info.lastFailedVersion); break; default: break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index b8660d1848..4ef3b7c071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -293,7 +293,7 @@ public class Replica implements Writable { if (newVersion < this.version) { // This case means that replica meta version has been updated by ReportHandler before - // For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6, + // For example, the publish version daemon has already sent some publish version tasks to one be to publish version 2, 3, 4, 5, 6, // and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait // for other be to finish most of publish version tasks to update replica version in fe. // At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 62cff85f96..d28546d277 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -18,7 +18,6 @@ package org.apache.doris.master; -import com.google.common.collect.Sets; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -80,6 +79,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Triple; @@ -703,8 +703,8 @@ public class ReportHandler extends Daemon { AgentTaskExecutor.submit(batchTask); } - LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId); - LOG.info("add {} replica(s) to meta. backend[{}]", addToMetaCounter, backendId); + LOG.info("delete {} tablet(s) and add {} replica(s) to meta from backend[{}]", + deleteFromBackendCounter, addToMetaCounter, backendId); } // replica is used and no version missing @@ -828,11 +828,15 @@ public class ReportHandler extends Daemon { } if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { - // The absolute value is meaningless, as long as it is greater than 0. - // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, - // it will be considered a version missing replica and will be handled accordingly. - replica.setLastFailedVersion(1L); - backendReplicasInfo.addMissingVersionReplica(tabletId); + // If the origin last failed version is larger than 0, not change it. + // Otherwise, we set last failed version to replica'version + 1. + // Because last failed version should always larger than replica's version. + long newLastFailedVersion = replica.getLastFailedVersion(); + if (newLastFailedVersion < 0) { + newLastFailedVersion = replica.getVersion() + 1; + } + replica.updateLastFailedVersion(newLastFailedVersion); + backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion); break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java index c382e1fdb5..80222f4618 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java @@ -44,11 +44,11 @@ public class BackendReplicasInfo implements Writable { } public void addBadReplica(long tabletId) { - replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD)); + replicaReportInfos.add(new ReplicaReportInfo(tabletId, -1, ReportInfoType.BAD)); } - public void addMissingVersionReplica(long tabletId) { - replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION)); + public void addMissingVersionReplica(long tabletId, long lastFailedVersion) { + replicaReportInfos.add(new ReplicaReportInfo(tabletId, lastFailedVersion, ReportInfoType.MISSING_VERSION)); } public long getBackendId() { @@ -84,9 +84,12 @@ public class BackendReplicasInfo implements Writable { public long tabletId; @SerializedName(value = "type") public ReportInfoType type; + @SerializedName(value = "lastFailedVersion") + public long lastFailedVersion; - public ReplicaReportInfo(long tabletId, ReportInfoType type) { + public ReplicaReportInfo(long tabletId, long lastFailedVersion, ReportInfoType type) { this.tabletId = tabletId; + this.lastFailedVersion = lastFailedVersion; this.type = type; } @@ -98,7 +101,12 @@ public class BackendReplicasInfo implements Writable { public static ReplicaReportInfo read(DataInput in) throws IOException { String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class); + ReplicaReportInfo info = GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class); + if (info.type == ReportInfoType.MISSING_VERSION && info.lastFailedVersion <= 0) { + // FIXME(cmy): Just for compatibility, should be remove in v1.2 + info.lastFailedVersion = 1; + } + return info; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 83585ffe32..9ce906fb79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -569,7 +569,7 @@ public class DatabaseTransactionMgr { TxnCommitAttachment txnCommitAttachment, Boolean is2PC) throws UserException { // check status - // the caller method already own db lock, we do not obtain db lock here + // the caller method already own tables' write lock Database db = catalog.getDbOrMetaException(dbId); TransactionState transactionState; readLock(); @@ -644,7 +644,7 @@ public class DatabaseTransactionMgr { LOG.info("transaction:[{}] successfully committed", transactionState); } - public boolean publishTransaction(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException { + public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException { TransactionState transactionState = null; readLock(); try { @@ -857,48 +857,24 @@ public class DatabaseTransactionMgr { } } + // check success replica number for each tablet. + // a success replica means: + // 1. Not in errorReplicaIds: succeed in both commit and publish phase + // 2. last failed version < 0: is a health replica before + // 3. version catch up: not with a stale version + // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { int healthReplicaNum = 0; for (Replica replica : tablet.getReplicas()) { - if (!errorReplicaIds.contains(replica.getId()) - && replica.getLastFailedVersion() < 0) { - // this means the replica is a healthy replica, - // it is healthy in the past and does not have error in current load + if (!errorReplicaIds.contains(replica.getId()) && replica.getLastFailedVersion() < 0) { if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) { - // during rollup, the rollup replica's last failed version < 0, - // it may be treated as a normal replica. - // the replica is not failed during commit or publish - // during upgrade, one replica's last version maybe invalid, - // has to compare version hash. - - // Here we still update the replica's info even if we failed to publish - // this txn, for the following case: - // replica A,B,C is successfully committed, but only A is successfully - // published, - // B and C is crashed, now we need a Clone task to repair this tablet. - // So, here we update A's version info, so that clone task will clone - // the latest version of data. - - replica.updateVersionInfo(partitionCommitInfo.getVersion(), - replica.getDataSize(), replica.getRowCount()); ++healthReplicaNum; - } else { - // this means the replica has error in the past, but we did not observe it - // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica - // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback - // then we will detect this and set C's last failed version to 10 and last success version to 11 - // this logic has to be replayed in checkpoint thread - replica.updateVersionInfo(replica.getVersion(), - partition.getVisibleVersion(), - partitionCommitInfo.getVersion()); - LOG.warn("transaction state {} has error, the replica [{}] not appeared in error replica list " - + " and its version not equal to partition commit version or commit version - 1" - + " if its not a upgrade stage, its a fatal error. ", transactionState, replica); } } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) { // the replica's version is larger than or equal to current transaction partition's version // the replica is normal, then remove it from error replica ids + // TODO(cmy): actually I have no idea why we need this check errorReplicaIds.remove(replica.getId()); ++healthReplicaNum; } @@ -1490,14 +1466,19 @@ public class DatabaseTransactionMgr { for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { long partitionId = partitionCommitInfo.getPartitionId(); Partition partition = table.getPartition(partitionId); + if (partition == null) { + LOG.warn("partition {} of table {} does not exist when update catalog after committed. transaction: {}, db: {}", + partitionId, tableId, transactionState.getTransactionId(), db.getId()); + continue; + } List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); for (MaterializedIndex index : allIndices) { List<Tablet> tablets = index.getTablets(); for (Tablet tablet : tablets) { for (Replica replica : tablet.getReplicas()) { if (errorReplicaIds.contains(replica.getId())) { - // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally - // should get from transaction state + // TODO(cmy): do we need to update last failed version here? + // because in updateCatalogAfterVisible, it will be updated again. replica.updateLastFailedVersion(partitionCommitInfo.getVersion()); } } @@ -1522,6 +1503,11 @@ public class DatabaseTransactionMgr { long partitionId = partitionCommitInfo.getPartitionId(); long newCommitVersion = partitionCommitInfo.getVersion(); Partition partition = table.getPartition(partitionId); + if (partition == null) { + LOG.warn("partition {} in table {} does not exist when update catalog after visible. transaction: {}, db: {}", + partitionId, tableId, transactionState.getTransactionId(), db.getId()); + continue; + } List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { @@ -1531,7 +1517,7 @@ public class DatabaseTransactionMgr { long lastSuccessVersion = replica.getLastSuccessVersion(); if (!errorReplicaIds.contains(replica.getId())) { if (replica.getLastFailedVersion() > 0) { - // if the replica is a failed replica, then not changing version and version hash + // if the replica is a failed replica, then not changing version newVersion = replica.getVersion(); } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) { // this means the replica has error in the past, but we did not observe it diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index b51cc9c0d9..9161eb525e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable { * @throws UserException * @throws TransactionCommitFailedException * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time - * @note callers should get db.write lock before call this api + * @note callers should get all tables' write locks before call this api */ public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) @@ -263,7 +263,7 @@ public class GlobalTransactionMgr implements Writable { // so we just return false to indicate publish timeout return false; } - return dbTransactionMgr.publishTransaction(db, transactionId, publishTimeoutMillis); + return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis); } public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 99da777962..7b592742f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -79,10 +79,7 @@ public class PublishVersionDaemon extends MasterDaemon { return; } - // TODO yiguolei: could publish transaction state according to multi-tenant cluster info - // but should do more work. for example, if a table is migrate from one cluster to another cluster - // should publish to two clusters. - // attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend + // ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend // then transaction manager will treat it as success List<Long> allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false); if (allBackends.isEmpty()) { @@ -198,7 +195,6 @@ public class PublishVersionDaemon extends MasterDaemon { continue; } - for (long tableId : transactionState.getTableIdList()) { Table table = db.getTableNullable(tableId); if (table == null || table.getType() != Table.TableType.OLAP) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java index 54ceaf44b1..e5a29a9860 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java @@ -49,7 +49,7 @@ public class BackendReplicaInfosTest { BackendReplicasInfo info = new BackendReplicasInfo(beId); info.addBadReplica(tabletId1); - info.addMissingVersionReplica(tabletId2); + info.addMissingVersionReplica(tabletId2, 11); checkInfo(info); info.write(dos); dos.flush(); @@ -73,6 +73,7 @@ public class BackendReplicaInfosTest { Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type); } else if (reportInfo.tabletId == tabletId2) { Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type); + Assert.assertEquals(11, reportInfo.lastFailedVersion); } else { Assert.fail("unknown tablet id: " + reportInfo.tabletId); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 3816f73d72..abdb01f940 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -67,12 +67,13 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import mockit.Injectable; -import mockit.Mocked; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import mockit.Injectable; +import mockit.Mocked; + public class GlobalTransactionMgrTest { private static FakeEditLog fakeEditLog; @@ -526,7 +527,9 @@ public class GlobalTransactionMgrTest { Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1); Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2); Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3); - assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion()); + // because after calling `finishTransaction`, the txn state is COMMITTED, not VISIBLE, + // so all replicas' version are not changed. + assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion()); assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion()); assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion()); assertEquals(-1, replica1.getLastFailedVersion()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org