This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d8f6a  [Alter] Allow submitting alter jobs when table is unstable
92d8f6a is described below

commit 92d8f6ae78a5c8d2bbf2bbdce68b7b75d237ef20
Author: WingC <1018957...@qq.com>
AuthorDate: Sat Jan 18 22:56:37 2020 +0800

    [Alter] Allow submitting alter jobs when table is unstable
    
    Alter job will wait table to be stable before running.
---
 fe/src/main/java/org/apache/doris/alter/Alter.java |   5 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  11 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   6 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  14 +-
 .../java/org/apache/doris/clone/TabletChecker.java | 153 ++++++++++-----------
 .../org/apache/doris/alter/RollupJobV2Test.java    |  88 +++++++++++-
 .../apache/doris/alter/SchemaChangeJobV2Test.java  |  82 +++++++++++
 .../org/apache/doris/catalog/CatalogTestUtil.java  |   7 +-
 8 files changed, 272 insertions(+), 94 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 9c2a008..b21b620 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -275,8 +275,9 @@ public class Alter {
             if (olapTable.getState() != OlapTableState.NORMAL) {
                 throw new DdlException("Table[" + table.getName() + "]'s state 
is not NORMAL. Do not allow doing ALTER ops");
             }
-            
-            if (needTableStable) {
+
+            // schema change job will wait until table become stable
+            if (needTableStable && !hasSchemaChange && 
!hasAddMaterializedView) {
                 // check if all tablets are healthy, and no tablet is in 
tablet scheduler
                 boolean isStable = 
olapTable.isStable(Catalog.getCurrentSystemInfo(),
                         Catalog.getCurrentCatalog().getTabletScheduler(),
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 2b4cda7..880814d 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -168,8 +168,17 @@ public class RollupJobV2 extends AlterJobV2 {
             if (tbl == null) {
                 throw new AlterCancelException("Table " + tableId + " does not 
exist");
             }
-            Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
 
+            boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
+                    Catalog.getCurrentCatalog().getTabletScheduler(),
+                    db.getClusterName());
+            if (!isStable) {
+                errMsg = "table is unstable";
+                LOG.warn("doing rollup job: " + jobId + " while table is not 
stable.");
+                return;
+            }
+
+            Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
             for (Map.Entry<Long, MaterializedIndex> entry : 
this.partitionIdToRollupIndex.entrySet()) {
                 long partitionId = entry.getKey();
                 Partition partition = tbl.getPartition(partitionId);
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index fbe36e5..c8b3109 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1310,12 +1310,12 @@ public class SchemaChangeHandler extends AlterHandler {
     public void process(List<AlterClause> alterClauses, String clusterName, 
Database db, OlapTable olapTable)
             throws UserException {
         // index id -> index schema
-        Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<Long, 
LinkedList<Column>>();
+        Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();
         for (Map.Entry<Long, List<Column>> entry : 
olapTable.getIndexIdToSchema().entrySet()) {
-            indexSchemaMap.put(entry.getKey(), new 
LinkedList<Column>(entry.getValue()));
+            indexSchemaMap.put(entry.getKey(), new 
LinkedList<>(entry.getValue()));
         }
         List<Index> newIndexes = olapTable.getCopiedIndexes();
-        Map<String, String> propertyMap = new HashMap<String, String>();
+        Map<String, String> propertyMap = new HashMap<>();
         for (AlterClause alterClause : alterClauses) {
             // get properties
             Map<String, String> properties = alterClause.getProperties();
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index a8fdc7b..056a80e 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -184,13 +184,23 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 totalReplicaNum += tablet.getReplicas().size();
             }
         }
-        MarkedCountDownLatch<Long, Long> countDownLatch = new 
MarkedCountDownLatch<Long, Long>(totalReplicaNum);
+        MarkedCountDownLatch<Long, Long> countDownLatch = new 
MarkedCountDownLatch<>(totalReplicaNum);
         db.readLock();
         try {
             OlapTable tbl = (OlapTable) db.getTable(tableId);
             if (tbl == null) {
                 throw new AlterCancelException("Table " + tableId + " does not 
exist");
-            } 
+            }
+
+            boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
+                    Catalog.getCurrentCatalog().getTabletScheduler(),
+                    db.getClusterName());
+            if (!isStable) {
+                errMsg = "table is unstable";
+                LOG.warn("doing schema change job: " + jobId + " while table 
is not stable.");
+                return;
+            }
+
             Preconditions.checkState(tbl.getState() == 
OlapTableState.SCHEMA_CHANGE);
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java 
b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
index bae2438..cba2d8a 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -100,6 +100,18 @@ public class TabletChecker extends MasterDaemon {
         }
     }
 
+    public static class RepairTabletInfo {
+        public long dbId;
+        public long tblId;
+        public List<Long> partIds;
+
+        public RepairTabletInfo(Long dbId, Long tblId, List<Long> partIds) {
+            this.dbId = dbId;
+            this.tblId = tblId;
+            this.partIds = partIds;
+        }
+    }
+
     public TabletChecker(Catalog catalog, SystemInfoService infoService, 
TabletScheduler tabletScheduler,
             TabletSchedulerStat stat) {
         super("tablet checker", CHECK_INTERVAL_MS);
@@ -109,42 +121,42 @@ public class TabletChecker extends MasterDaemon {
         this.stat = stat;
     }
 
-    public void addPrios(long dbId, long tblId, List<Long> partitionIds, long 
timeoutMs) {
-        Preconditions.checkArgument(!partitionIds.isEmpty());
+    private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) {
+        Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
         long currentTime = System.currentTimeMillis();
         synchronized (prios) {
-            Set<PrioPart> parts = prios.get(dbId, tblId);
+            Set<PrioPart> parts = prios.get(repairTabletInfo.dbId, 
repairTabletInfo.tblId);
             if (parts == null) {
                 parts = Sets.newHashSet();
-                prios.put(dbId, tblId, parts);
+                prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId, 
parts);
             }
 
-            for (long partId : partitionIds) {
+            for (long partId : repairTabletInfo.partIds) {
                 PrioPart prioPart = new PrioPart(partId, currentTime, 
timeoutMs);
                 parts.add(prioPart);
             }
         }
 
         // we also need to change the priority of tablets which are already in
-        tabletScheduler.changeTabletsPriorityToVeryHigh(dbId, tblId, 
partitionIds);
+        tabletScheduler.changeTabletsPriorityToVeryHigh(repairTabletInfo.dbId, 
repairTabletInfo.tblId, repairTabletInfo.partIds);
     }
 
-    private void removePrios(long dbId, long tblId, List<Long> partitionIds) {
-        Preconditions.checkArgument(!partitionIds.isEmpty());
+    private void removePrios(RepairTabletInfo repairTabletInfo) {
+        Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
         synchronized (prios) {
-            Map<Long, Set<PrioPart>> tblMap = prios.row(dbId);
+            Map<Long, Set<PrioPart>> tblMap = prios.row(repairTabletInfo.dbId);
             if (tblMap == null) {
                 return;
             }
-            Set<PrioPart> parts = tblMap.get(tblId);
+            Set<PrioPart> parts = tblMap.get(repairTabletInfo.tblId);
             if (parts == null) {
                 return;
             }
-            for (long partId : partitionIds) {
+            for (long partId : repairTabletInfo.partIds) {
                 parts.remove(new PrioPart(partId, -1, -1));
             }
             if (parts.isEmpty()) {
-                tblMap.remove(tblId);
+                tblMap.remove(repairTabletInfo.tblId);
             }
         }
 
@@ -271,7 +283,8 @@ public class TabletChecker extends MasterDaemon {
                             // priorities.
                             LOG.debug("partition is healthy, remove from 
prios: {}-{}-{}",
                                     db.getId(), olapTbl.getId(), 
partition.getId());
-                            removePrios(db.getId(), olapTbl.getId(), 
Lists.newArrayList(partition.getId()));
+                            removePrios(new RepairTabletInfo(db.getId(),
+                                    olapTbl.getId(), 
Lists.newArrayList(partition.getId())));
                         }
                     } // partitions
                 } // tables
@@ -356,53 +369,53 @@ public class TabletChecker extends MasterDaemon {
      * when being scheduled.
      */
     public void repairTable(AdminRepairTableStmt stmt) throws DdlException {
-        Catalog catalog = Catalog.getCurrentCatalog();
-        Database db = catalog.getDb(stmt.getDbName());
-        if (db == null) {
-            throw new DdlException("Database " + stmt.getDbName() + " does not 
exist");
-        }
+        RepairTabletInfo repairTabletInfo = 
getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
+        addPrios(repairTabletInfo, stmt.getTimeoutS());
+        LOG.info("repair database: {}, table: {}, partition: {}", 
repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
+    }
 
-        long dbId = db.getId();
-        long tblId = -1;
-        List<Long> partIds = Lists.newArrayList();
-        db.readLock();
-        try {
-            Table tbl = db.getTable(stmt.getTblName());
-            if (tbl == null || tbl.getType() != TableType.OLAP) {
-                throw new DdlException("Table does not exist or is not OLAP 
table: " + stmt.getTblName());
+    /*
+     * handle ADMIN CANCEL REPAIR TABLE stmt send by user.
+     * This operation will remove the specified partitions from 'prios'
+     */
+    public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws 
DdlException {
+        RepairTabletInfo repairTabletInfo = 
getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
+        removePrios(repairTabletInfo);
+        LOG.info("cancel repair database: {}, table: {}, partition: {}", 
repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
+    }
+
+    public int getPrioPartitionNum() {
+        int count = 0;
+        synchronized (prios) {
+            for (Set<PrioPart> set : prios.values()) {
+                count += set.size();
             }
+        }
+        return count;
+    }
 
-            tblId = tbl.getId();
-            OlapTable olapTable = (OlapTable) tbl;
-            if (stmt.getPartitions().isEmpty()) {
-                partIds = olapTable.getPartitions().stream().map(p -> 
p.getId()).collect(Collectors.toList());
-            } else {
-                for (String partName : stmt.getPartitions()) {
-                    Partition partition = olapTable.getPartition(partName);
-                    if (partition == null) {
-                        throw new DdlException("Partition does not exist: " + 
partName);
-                    }
-                    partIds.add(partition.getId());
+    public List<List<String>> getPriosInfo() {
+        List<List<String>> infos = Lists.newArrayList();
+        synchronized (prios) {
+            for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
+                for (PrioPart part : cell.getValue()) {
+                    List<String> row = Lists.newArrayList();
+                    row.add(cell.getRowKey().toString());
+                    row.add(cell.getColumnKey().toString());
+                    row.add(String.valueOf(part.partId));
+                    row.add(String.valueOf(part.timeoutMs - 
(System.currentTimeMillis() - part.addTime)));
+                    infos.add(row);
                 }
             }
-        } finally {
-            db.readUnlock();
         }
-
-        Preconditions.checkState(tblId != -1);
-        addPrios(dbId, tblId, partIds, stmt.getTimeoutS() * 1000);
-        LOG.info("repair database: {}, table: {}, partition: {}", dbId, tblId, 
partIds);
+        return infos;
     }
 
-    /*
-     * handle ADMIN CANCEL REPAIR TABLE stmt send by user.
-     * This operation will remove the specified partitions from 'prios'
-     */
-    public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws 
DdlException {
+    public static RepairTabletInfo getRepairTabletInfo(String dbName, String 
tblName, List<String> partitions) throws DdlException {
         Catalog catalog = Catalog.getCurrentCatalog();
-        Database db = catalog.getDb(stmt.getDbName());
+        Database db = catalog.getDb(dbName);
         if (db == null) {
-            throw new DdlException("Database " + stmt.getDbName() + " does not 
exist");
+            throw new DdlException("Database " + dbName + " does not exist");
         }
 
         long dbId = db.getId();
@@ -410,17 +423,18 @@ public class TabletChecker extends MasterDaemon {
         List<Long> partIds = Lists.newArrayList();
         db.readLock();
         try {
-            Table tbl = db.getTable(stmt.getTblName());
+            Table tbl = db.getTable(tblName);
             if (tbl == null || tbl.getType() != TableType.OLAP) {
-                throw new DdlException("Table does not exist or is not OLAP 
table: " + stmt.getTblName());
+                throw new DdlException("Table does not exist or is not OLAP 
table: " + tblName);
             }
 
             tblId = tbl.getId();
             OlapTable olapTable = (OlapTable) tbl;
-            if (stmt.getPartitions().isEmpty()) {
-                partIds = olapTable.getPartitions().stream().map(p -> 
p.getId()).collect(Collectors.toList());
+
+            if (partitions == null || partitions.isEmpty()) {
+                partIds = 
olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList());
             } else {
-                for (String partName : stmt.getPartitions()) {
+                for (String partName : partitions) {
                     Partition partition = olapTable.getPartition(partName);
                     if (partition == null) {
                         throw new DdlException("Partition does not exist: " + 
partName);
@@ -433,34 +447,7 @@ public class TabletChecker extends MasterDaemon {
         }
 
         Preconditions.checkState(tblId != -1);
-        removePrios(dbId, tblId, partIds);
-        LOG.info("cancel repair database: {}, table: {}, partition: {}", dbId, 
tblId, partIds);
-    }
-
-    public int getPrioPartitionNum() {
-        int count = 0;
-        synchronized (prios) {
-            for (Set<PrioPart> set : prios.values()) {
-                count += set.size();
-            }
-        }
-        return count;
-    }
 
-    public List<List<String>> getPriosInfo() {
-        List<List<String>> infos = Lists.newArrayList();
-        synchronized (prios) {
-            for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
-                for (PrioPart part : cell.getValue()) {
-                    List<String> row = Lists.newArrayList();
-                    row.add(cell.getRowKey().toString());
-                    row.add(cell.getColumnKey().toString());
-                    row.add(String.valueOf(part.partId));
-                    row.add(String.valueOf(part.timeoutMs - 
(System.currentTimeMillis() - part.addTime)));
-                    infos.add(row);
-                }
-            }
-        }
-        return infos;
+        return new RepairTabletInfo(dbId, tblId, partIds);
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index 8f88bc0..d79b8c5 100644
--- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -58,6 +58,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+
 public class RollupJobV2Test {
     private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
     private static GlobalTransactionMgr masterTransMgr;
@@ -69,11 +71,13 @@ public class RollupJobV2Test {
     private static Analyzer analyzer;
     private static AddRollupClause clause;
 
-    FakeEditLog fakeEditLog;
+    private FakeCatalog fakeCatalog;
+    private FakeEditLog fakeEditLog;
 
     @Before
     public void setUp() throws InstantiationException, IllegalAccessException, 
IllegalArgumentException,
             InvocationTargetException, NoSuchMethodException, 
SecurityException, AnalysisException {
+        fakeCatalog = new FakeCatalog();
         fakeEditLog = new FakeEditLog();
         fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
         masterCatalog = CatalogTestUtil.createTestCatalog();
@@ -110,7 +114,9 @@ public class RollupJobV2Test {
 
     @Test
     public void testAddSchemaChange() throws UserException {
+        fakeCatalog = new FakeCatalog();
         fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
         MaterializedViewHandler materializedViewHandler = 
Catalog.getInstance().getRollupHandler();
         ArrayList<AlterClause> alterClauses = new ArrayList<>();
         alterClauses.add(clause);
@@ -125,7 +131,9 @@ public class RollupJobV2Test {
     // start a schema change, then finished
     @Test
     public void testSchemaChange1() throws Exception {
+        fakeCatalog = new FakeCatalog();
         fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
         MaterializedViewHandler materializedViewHandler = 
Catalog.getInstance().getRollupHandler();
 
         // add a rollup job
@@ -225,4 +233,82 @@ public class RollupJobV2Test {
         */
     }
 
+    @Test
+    public void testSchemaChangeWhileTabletNotStable() throws Exception {
+        fakeCatalog = new FakeCatalog();
+        fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
+        MaterializedViewHandler materializedViewHandler = 
Catalog.getInstance().getRollupHandler();
+
+        // add a rollup job
+        ArrayList<AlterClause> alterClauses = new ArrayList<>();
+        alterClauses.add(clause);
+        Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
+        OlapTable olapTable = (OlapTable) 
db.getTable(CatalogTestUtil.testTableId1);
+        Partition testPartition = 
olapTable.getPartition(CatalogTestUtil.testPartitionId1);
+        materializedViewHandler.process(alterClauses, db.getClusterName(), db, 
olapTable);
+        Map<Long, AlterJobV2> alterJobsV2 = 
materializedViewHandler.getAlterJobsV2();
+        Assert.assertEquals(1, alterJobsV2.size());
+        RollupJobV2 rollupJob = (RollupJobV2) 
alterJobsV2.values().stream().findAny().get();
+
+        MaterializedIndex baseIndex = testPartition.getBaseIndex();
+        assertEquals(MaterializedIndex.IndexState.NORMAL, 
baseIndex.getState());
+        assertEquals(Partition.PartitionState.NORMAL, 
testPartition.getState());
+        assertEquals(OlapTableState.ROLLUP, olapTable.getState());
+
+        Tablet baseTablet = baseIndex.getTablets().get(0);
+        List<Replica> replicas = baseTablet.getReplicas();
+        Replica replica1 = replicas.get(0);
+        Replica replica2 = replicas.get(1);
+        Replica replica3 = replicas.get(2);
+
+        assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
+        assertEquals(-1, replica1.getLastFailedVersion());
+        assertEquals(-1, replica2.getLastFailedVersion());
+        assertEquals(-1, replica3.getLastFailedVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica1.getLastSuccessVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica2.getLastSuccessVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica3.getLastSuccessVersion());
+
+        // runPendingJob
+        replica1.setState(Replica.ReplicaState.DECOMMISSION);
+        materializedViewHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.PENDING, rollupJob.getJobState());
+
+        // table is stable, runPendingJob again
+        replica1.setState(Replica.ReplicaState.NORMAL);
+        materializedViewHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState());
+        Assert.assertEquals(2, 
testPartition.getMaterializedIndices(IndexExtState.ALL).size());
+        Assert.assertEquals(1, 
testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+        Assert.assertEquals(1, 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
+
+        // runWaitingTxnJob
+        materializedViewHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
+
+        // runWaitingTxnJob, task not finished
+        materializedViewHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
+
+        // finish all tasks
+        List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
+        Assert.assertEquals(3, tasks.size());
+        for (AgentTask agentTask : tasks) {
+            agentTask.setFinished(true);
+        }
+        MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
+        for (Tablet shadowTablet : shadowIndex.getTablets()) {
+            for (Replica shadowReplica : shadowTablet.getReplicas()) {
+                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
+                        testPartition.getVisibleVersionHash(), 
shadowReplica.getDataSize(),
+                        shadowReplica.getRowCount());
+            }
+        }
+
+        materializedViewHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
+    }
 }
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index d462eaa..91a745b 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -207,6 +207,88 @@ public class SchemaChangeJobV2Test {
     }
 
     @Test
+    public void testSchemaChangeWhileTabletNotStable() throws Exception {
+        fakeCatalog = new FakeCatalog();
+        fakeEditLog = new FakeEditLog();
+        FakeCatalog.setCatalog(masterCatalog);
+        SchemaChangeHandler schemaChangeHandler = 
Catalog.getInstance().getSchemaChangeHandler();
+
+        // add a schema change job
+        ArrayList<AlterClause> alterClauses = new ArrayList<>();
+        alterClauses.add(addColumnClause);
+        Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
+        OlapTable olapTable = (OlapTable) 
db.getTable(CatalogTestUtil.testTableId1);
+        Partition testPartition = 
olapTable.getPartition(CatalogTestUtil.testPartitionId1);
+        schemaChangeHandler.process(alterClauses, "default_cluster", db, 
olapTable);
+        Map<Long, AlterJobV2> alterJobsV2 = 
schemaChangeHandler.getAlterJobsV2();
+        Assert.assertEquals(1, alterJobsV2.size());
+        SchemaChangeJobV2 schemaChangeJob = (SchemaChangeJobV2) 
alterJobsV2.values().stream().findAny().get();
+
+        MaterializedIndex baseIndex = testPartition.getBaseIndex();
+        assertEquals(IndexState.NORMAL, baseIndex.getState());
+        assertEquals(PartitionState.NORMAL, testPartition.getState());
+        assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
+
+        Tablet baseTablet = baseIndex.getTablets().get(0);
+        List<Replica> replicas = baseTablet.getReplicas();
+        Replica replica1 = replicas.get(0);
+        Replica replica2 = replicas.get(1);
+        Replica replica3 = replicas.get(2);
+
+        assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
+        assertEquals(-1, replica1.getLastFailedVersion());
+        assertEquals(-1, replica2.getLastFailedVersion());
+        assertEquals(-1, replica3.getLastFailedVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica1.getLastSuccessVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica2.getLastSuccessVersion());
+        assertEquals(CatalogTestUtil.testStartVersion, 
replica3.getLastSuccessVersion());
+
+        // runPendingJob
+        replica1.setState(Replica.ReplicaState.DECOMMISSION);
+        schemaChangeHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.PENDING, schemaChangeJob.getJobState());
+
+        // table is stable runPendingJob again
+        replica1.setState(Replica.ReplicaState.NORMAL);
+        schemaChangeHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.WAITING_TXN, 
schemaChangeJob.getJobState());
+        Assert.assertEquals(2, 
testPartition.getMaterializedIndices(IndexExtState.ALL).size());
+        Assert.assertEquals(1, 
testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+        Assert.assertEquals(1, 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
+
+        // runWaitingTxnJob
+        schemaChangeHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+        // runWaitingTxnJob, task not finished
+        schemaChangeHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+        // runRunningJob
+        schemaChangeHandler.runAfterCatalogReady();
+        // task not finished, still running
+        Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+        // finish alter tasks
+        List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
+        Assert.assertEquals(3, tasks.size());
+        for (AgentTask agentTask : tasks) {
+            agentTask.setFinished(true);
+        }
+        MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
+        for (Tablet shadowTablet : shadowIndex.getTablets()) {
+            for (Replica shadowReplica : shadowTablet.getReplicas()) {
+                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), 
testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(), 
shadowReplica.getRowCount());
+            }
+        }
+
+        schemaChangeHandler.runAfterCatalogReady();
+        Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState());
+    }
+
+    @Test
     public void testModifyDynamicPartitionNormal() throws UserException {
         fakeCatalog = new FakeCatalog();
         fakeEditLog = new FakeEditLog();
diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java 
b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 172f860..7a1276b 100644
--- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -89,8 +89,11 @@ public class CatalogTestUtil {
         catalog.setEditLog(new EditLog("name"));
         FakeCatalog.setCatalog(catalog);
         Backend backend1 = createBackend(testBackendId1, "host1", 123, 124, 
125);
-        Backend backend2 = createBackend(testBackendId2, "host1", 123, 124, 
125);
-        Backend backend3 = createBackend(testBackendId3, "host1", 123, 124, 
125);
+        Backend backend2 = createBackend(testBackendId2, "host2", 123, 124, 
125);
+        Backend backend3 = createBackend(testBackendId3, "host3", 123, 124, 
125);
+        backend1.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        backend2.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+        backend3.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
         Catalog.getCurrentSystemInfo().addBackend(backend1);
         Catalog.getCurrentSystemInfo().addBackend(backend2);
         Catalog.getCurrentSystemInfo().addBackend(backend3);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to