This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 92d8f6a [Alter] Allow submitting alter jobs when table is unstable 92d8f6a is described below commit 92d8f6ae78a5c8d2bbf2bbdce68b7b75d237ef20 Author: WingC <1018957...@qq.com> AuthorDate: Sat Jan 18 22:56:37 2020 +0800 [Alter] Allow submitting alter jobs when table is unstable Alter job will wait table to be stable before running. --- fe/src/main/java/org/apache/doris/alter/Alter.java | 5 +- .../java/org/apache/doris/alter/RollupJobV2.java | 11 +- .../apache/doris/alter/SchemaChangeHandler.java | 6 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 14 +- .../java/org/apache/doris/clone/TabletChecker.java | 153 ++++++++++----------- .../org/apache/doris/alter/RollupJobV2Test.java | 88 +++++++++++- .../apache/doris/alter/SchemaChangeJobV2Test.java | 82 +++++++++++ .../org/apache/doris/catalog/CatalogTestUtil.java | 7 +- 8 files changed, 272 insertions(+), 94 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index 9c2a008..b21b620 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -275,8 +275,9 @@ public class Alter { if (olapTable.getState() != OlapTableState.NORMAL) { throw new DdlException("Table[" + table.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops"); } - - if (needTableStable) { + + // schema change job will wait until table become stable + if (needTableStable && !hasSchemaChange && !hasAddMaterializedView) { // check if all tablets are healthy, and no tablet is in tablet scheduler boolean isStable = olapTable.isStable(Catalog.getCurrentSystemInfo(), Catalog.getCurrentCatalog().getTabletScheduler(), diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index 2b4cda7..880814d 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -168,8 +168,17 @@ public class RollupJobV2 extends AlterJobV2 { if (tbl == null) { throw new AlterCancelException("Table " + tableId + " does not exist"); } - Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(), + Catalog.getCurrentCatalog().getTabletScheduler(), + db.getClusterName()); + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("doing rollup job: " + jobId + " while table is not stable."); + return; + } + + Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); Partition partition = tbl.getPartition(partitionId); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index fbe36e5..c8b3109 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1310,12 +1310,12 @@ public class SchemaChangeHandler extends AlterHandler { public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) throws UserException { // index id -> index schema - Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<Long, LinkedList<Column>>(); + Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>(); for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema().entrySet()) { - indexSchemaMap.put(entry.getKey(), new LinkedList<Column>(entry.getValue())); + indexSchemaMap.put(entry.getKey(), new LinkedList<>(entry.getValue())); } List<Index> newIndexes = olapTable.getCopiedIndexes(); - Map<String, String> propertyMap = new HashMap<String, String>(); + Map<String, String> propertyMap = new HashMap<>(); for (AlterClause alterClause : alterClauses) { // get properties Map<String, String> properties = alterClause.getProperties(); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index a8fdc7b..056a80e 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -184,13 +184,23 @@ public class SchemaChangeJobV2 extends AlterJobV2 { totalReplicaNum += tablet.getReplicas().size(); } } - MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum); + MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum); db.readLock(); try { OlapTable tbl = (OlapTable) db.getTable(tableId); if (tbl == null) { throw new AlterCancelException("Table " + tableId + " does not exist"); - } + } + + boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(), + Catalog.getCurrentCatalog().getTabletScheduler(), + db.getClusterName()); + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("doing schema change job: " + jobId + " while table is not stable."); + return; + } + Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java index bae2438..cba2d8a 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -100,6 +100,18 @@ public class TabletChecker extends MasterDaemon { } } + public static class RepairTabletInfo { + public long dbId; + public long tblId; + public List<Long> partIds; + + public RepairTabletInfo(Long dbId, Long tblId, List<Long> partIds) { + this.dbId = dbId; + this.tblId = tblId; + this.partIds = partIds; + } + } + public TabletChecker(Catalog catalog, SystemInfoService infoService, TabletScheduler tabletScheduler, TabletSchedulerStat stat) { super("tablet checker", CHECK_INTERVAL_MS); @@ -109,42 +121,42 @@ public class TabletChecker extends MasterDaemon { this.stat = stat; } - public void addPrios(long dbId, long tblId, List<Long> partitionIds, long timeoutMs) { - Preconditions.checkArgument(!partitionIds.isEmpty()); + private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) { + Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty()); long currentTime = System.currentTimeMillis(); synchronized (prios) { - Set<PrioPart> parts = prios.get(dbId, tblId); + Set<PrioPart> parts = prios.get(repairTabletInfo.dbId, repairTabletInfo.tblId); if (parts == null) { parts = Sets.newHashSet(); - prios.put(dbId, tblId, parts); + prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId, parts); } - for (long partId : partitionIds) { + for (long partId : repairTabletInfo.partIds) { PrioPart prioPart = new PrioPart(partId, currentTime, timeoutMs); parts.add(prioPart); } } // we also need to change the priority of tablets which are already in - tabletScheduler.changeTabletsPriorityToVeryHigh(dbId, tblId, partitionIds); + tabletScheduler.changeTabletsPriorityToVeryHigh(repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); } - private void removePrios(long dbId, long tblId, List<Long> partitionIds) { - Preconditions.checkArgument(!partitionIds.isEmpty()); + private void removePrios(RepairTabletInfo repairTabletInfo) { + Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty()); synchronized (prios) { - Map<Long, Set<PrioPart>> tblMap = prios.row(dbId); + Map<Long, Set<PrioPart>> tblMap = prios.row(repairTabletInfo.dbId); if (tblMap == null) { return; } - Set<PrioPart> parts = tblMap.get(tblId); + Set<PrioPart> parts = tblMap.get(repairTabletInfo.tblId); if (parts == null) { return; } - for (long partId : partitionIds) { + for (long partId : repairTabletInfo.partIds) { parts.remove(new PrioPart(partId, -1, -1)); } if (parts.isEmpty()) { - tblMap.remove(tblId); + tblMap.remove(repairTabletInfo.tblId); } } @@ -271,7 +283,8 @@ public class TabletChecker extends MasterDaemon { // priorities. LOG.debug("partition is healthy, remove from prios: {}-{}-{}", db.getId(), olapTbl.getId(), partition.getId()); - removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId())); + removePrios(new RepairTabletInfo(db.getId(), + olapTbl.getId(), Lists.newArrayList(partition.getId()))); } } // partitions } // tables @@ -356,53 +369,53 @@ public class TabletChecker extends MasterDaemon { * when being scheduled. */ public void repairTable(AdminRepairTableStmt stmt) throws DdlException { - Catalog catalog = Catalog.getCurrentCatalog(); - Database db = catalog.getDb(stmt.getDbName()); - if (db == null) { - throw new DdlException("Database " + stmt.getDbName() + " does not exist"); - } + RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions()); + addPrios(repairTabletInfo, stmt.getTimeoutS()); + LOG.info("repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); + } - long dbId = db.getId(); - long tblId = -1; - List<Long> partIds = Lists.newArrayList(); - db.readLock(); - try { - Table tbl = db.getTable(stmt.getTblName()); - if (tbl == null || tbl.getType() != TableType.OLAP) { - throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName()); + /* + * handle ADMIN CANCEL REPAIR TABLE stmt send by user. + * This operation will remove the specified partitions from 'prios' + */ + public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException { + RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions()); + removePrios(repairTabletInfo); + LOG.info("cancel repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds); + } + + public int getPrioPartitionNum() { + int count = 0; + synchronized (prios) { + for (Set<PrioPart> set : prios.values()) { + count += set.size(); } + } + return count; + } - tblId = tbl.getId(); - OlapTable olapTable = (OlapTable) tbl; - if (stmt.getPartitions().isEmpty()) { - partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList()); - } else { - for (String partName : stmt.getPartitions()) { - Partition partition = olapTable.getPartition(partName); - if (partition == null) { - throw new DdlException("Partition does not exist: " + partName); - } - partIds.add(partition.getId()); + public List<List<String>> getPriosInfo() { + List<List<String>> infos = Lists.newArrayList(); + synchronized (prios) { + for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) { + for (PrioPart part : cell.getValue()) { + List<String> row = Lists.newArrayList(); + row.add(cell.getRowKey().toString()); + row.add(cell.getColumnKey().toString()); + row.add(String.valueOf(part.partId)); + row.add(String.valueOf(part.timeoutMs - (System.currentTimeMillis() - part.addTime))); + infos.add(row); } } - } finally { - db.readUnlock(); } - - Preconditions.checkState(tblId != -1); - addPrios(dbId, tblId, partIds, stmt.getTimeoutS() * 1000); - LOG.info("repair database: {}, table: {}, partition: {}", dbId, tblId, partIds); + return infos; } - /* - * handle ADMIN CANCEL REPAIR TABLE stmt send by user. - * This operation will remove the specified partitions from 'prios' - */ - public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException { + public static RepairTabletInfo getRepairTabletInfo(String dbName, String tblName, List<String> partitions) throws DdlException { Catalog catalog = Catalog.getCurrentCatalog(); - Database db = catalog.getDb(stmt.getDbName()); + Database db = catalog.getDb(dbName); if (db == null) { - throw new DdlException("Database " + stmt.getDbName() + " does not exist"); + throw new DdlException("Database " + dbName + " does not exist"); } long dbId = db.getId(); @@ -410,17 +423,18 @@ public class TabletChecker extends MasterDaemon { List<Long> partIds = Lists.newArrayList(); db.readLock(); try { - Table tbl = db.getTable(stmt.getTblName()); + Table tbl = db.getTable(tblName); if (tbl == null || tbl.getType() != TableType.OLAP) { - throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName()); + throw new DdlException("Table does not exist or is not OLAP table: " + tblName); } tblId = tbl.getId(); OlapTable olapTable = (OlapTable) tbl; - if (stmt.getPartitions().isEmpty()) { - partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList()); + + if (partitions == null || partitions.isEmpty()) { + partIds = olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList()); } else { - for (String partName : stmt.getPartitions()) { + for (String partName : partitions) { Partition partition = olapTable.getPartition(partName); if (partition == null) { throw new DdlException("Partition does not exist: " + partName); @@ -433,34 +447,7 @@ public class TabletChecker extends MasterDaemon { } Preconditions.checkState(tblId != -1); - removePrios(dbId, tblId, partIds); - LOG.info("cancel repair database: {}, table: {}, partition: {}", dbId, tblId, partIds); - } - - public int getPrioPartitionNum() { - int count = 0; - synchronized (prios) { - for (Set<PrioPart> set : prios.values()) { - count += set.size(); - } - } - return count; - } - public List<List<String>> getPriosInfo() { - List<List<String>> infos = Lists.newArrayList(); - synchronized (prios) { - for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) { - for (PrioPart part : cell.getValue()) { - List<String> row = Lists.newArrayList(); - row.add(cell.getRowKey().toString()); - row.add(cell.getColumnKey().toString()); - row.add(String.valueOf(part.partId)); - row.add(String.valueOf(part.timeoutMs - (System.currentTimeMillis() - part.addTime))); - infos.add(row); - } - } - } - return infos; + return new RepairTabletInfo(dbId, tblId, partIds); } } diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 8f88bc0..d79b8c5 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -58,6 +58,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; + public class RollupJobV2Test { private static FakeTransactionIDGenerator fakeTransactionIDGenerator; private static GlobalTransactionMgr masterTransMgr; @@ -69,11 +71,13 @@ public class RollupJobV2Test { private static Analyzer analyzer; private static AddRollupClause clause; - FakeEditLog fakeEditLog; + private FakeCatalog fakeCatalog; + private FakeEditLog fakeEditLog; @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); fakeTransactionIDGenerator = new FakeTransactionIDGenerator(); masterCatalog = CatalogTestUtil.createTestCatalog(); @@ -110,7 +114,9 @@ public class RollupJobV2Test { @Test public void testAddSchemaChange() throws UserException { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); ArrayList<AlterClause> alterClauses = new ArrayList<>(); alterClauses.add(clause); @@ -125,7 +131,9 @@ public class RollupJobV2Test { // start a schema change, then finished @Test public void testSchemaChange1() throws Exception { + fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); // add a rollup job @@ -225,4 +233,82 @@ public class RollupJobV2Test { */ } + @Test + public void testSchemaChangeWhileTabletNotStable() throws Exception { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + MaterializedViewHandler materializedViewHandler = Catalog.getInstance().getRollupHandler(); + + // add a rollup job + ArrayList<AlterClause> alterClauses = new ArrayList<>(); + alterClauses.add(clause); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1); + OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1); + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + materializedViewHandler.process(alterClauses, db.getClusterName(), db, olapTable); + Map<Long, AlterJobV2> alterJobsV2 = materializedViewHandler.getAlterJobsV2(); + Assert.assertEquals(1, alterJobsV2.size()); + RollupJobV2 rollupJob = (RollupJobV2) alterJobsV2.values().stream().findAny().get(); + + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + assertEquals(MaterializedIndex.IndexState.NORMAL, baseIndex.getState()); + assertEquals(Partition.PartitionState.NORMAL, testPartition.getState()); + assertEquals(OlapTableState.ROLLUP, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List<Replica> replicas = baseTablet.getReplicas(); + Replica replica1 = replicas.get(0); + Replica replica2 = replicas.get(1); + Replica replica3 = replicas.get(2); + + assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion()); + assertEquals(-1, replica1.getLastFailedVersion()); + assertEquals(-1, replica2.getLastFailedVersion()); + assertEquals(-1, replica3.getLastFailedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion()); + + // runPendingJob + replica1.setState(Replica.ReplicaState.DECOMMISSION); + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.PENDING, rollupJob.getJobState()); + + // table is stable, runPendingJob again + replica1.setState(Replica.ReplicaState.NORMAL); + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState()); + Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size()); + + // runWaitingTxnJob + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState()); + + // runWaitingTxnJob, task not finished + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState()); + + // finish all tasks + List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); + for (Tablet shadowTablet : shadowIndex.getTablets()) { + for (Replica shadowReplica : shadowTablet.getReplicas()) { + shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), + testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), + shadowReplica.getRowCount()); + } + } + + materializedViewHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState()); + } } diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index d462eaa..91a745b 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -207,6 +207,88 @@ public class SchemaChangeJobV2Test { } @Test + public void testSchemaChangeWhileTabletNotStable() throws Exception { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + SchemaChangeHandler schemaChangeHandler = Catalog.getInstance().getSchemaChangeHandler(); + + // add a schema change job + ArrayList<AlterClause> alterClauses = new ArrayList<>(); + alterClauses.add(addColumnClause); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1); + OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1); + Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + schemaChangeHandler.process(alterClauses, "default_cluster", db, olapTable); + Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2(); + Assert.assertEquals(1, alterJobsV2.size()); + SchemaChangeJobV2 schemaChangeJob = (SchemaChangeJobV2) alterJobsV2.values().stream().findAny().get(); + + MaterializedIndex baseIndex = testPartition.getBaseIndex(); + assertEquals(IndexState.NORMAL, baseIndex.getState()); + assertEquals(PartitionState.NORMAL, testPartition.getState()); + assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState()); + + Tablet baseTablet = baseIndex.getTablets().get(0); + List<Replica> replicas = baseTablet.getReplicas(); + Replica replica1 = replicas.get(0); + Replica replica2 = replicas.get(1); + Replica replica3 = replicas.get(2); + + assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion()); + assertEquals(-1, replica1.getLastFailedVersion()); + assertEquals(-1, replica2.getLastFailedVersion()); + assertEquals(-1, replica3.getLastFailedVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica1.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica2.getLastSuccessVersion()); + assertEquals(CatalogTestUtil.testStartVersion, replica3.getLastSuccessVersion()); + + // runPendingJob + replica1.setState(Replica.ReplicaState.DECOMMISSION); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.PENDING, schemaChangeJob.getJobState()); + + // table is stable runPendingJob again + replica1.setState(Replica.ReplicaState.NORMAL); + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.WAITING_TXN, schemaChangeJob.getJobState()); + Assert.assertEquals(2, testPartition.getMaterializedIndices(IndexExtState.ALL).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size()); + Assert.assertEquals(1, testPartition.getMaterializedIndices(IndexExtState.SHADOW).size()); + + // runWaitingTxnJob + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // runWaitingTxnJob, task not finished + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // runRunningJob + schemaChangeHandler.runAfterCatalogReady(); + // task not finished, still running + Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState()); + + // finish alter tasks + List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER); + Assert.assertEquals(3, tasks.size()); + for (AgentTask agentTask : tasks) { + agentTask.setFinished(true); + } + MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); + for (Tablet shadowTablet : shadowIndex.getTablets()) { + for (Replica shadowReplica : shadowTablet.getReplicas()) { + shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), shadowReplica.getRowCount()); + } + } + + schemaChangeHandler.runAfterCatalogReady(); + Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState()); + } + + @Test public void testModifyDynamicPartitionNormal() throws UserException { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 172f860..7a1276b 100644 --- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -89,8 +89,11 @@ public class CatalogTestUtil { catalog.setEditLog(new EditLog("name")); FakeCatalog.setCatalog(catalog); Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 125); - Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 125); - Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 125); + Backend backend2 = createBackend(testBackendId2, "host2", 123, 124, 125); + Backend backend3 = createBackend(testBackendId3, "host3", 123, 124, 125); + backend1.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); + backend2.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); + backend3.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); Catalog.getCurrentSystemInfo().addBackend(backend1); Catalog.getCurrentSystemInfo().addBackend(backend2); Catalog.getCurrentSystemInfo().addBackend(backend3); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org