This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a015cd0 [Alter] Change table's state right after all rollup jobs being cancelled a015cd0 is described below commit a015cd0c8bae9a561855a53d4f6df99be0d8ce37 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Feb 19 19:45:35 2020 +0800 [Alter] Change table's state right after all rollup jobs being cancelled --- .../doris/alter/MaterializedViewHandler.java | 26 +++++--- .../org/apache/doris/alter/BatchRollupJobTest.java | 70 ++++++++++++++++++++-- 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index e4eaabb..6f8f482 100644 --- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -740,13 +740,13 @@ public class MaterializedViewHandler extends AlterHandler { return new HashMap<>(alterJobsV2); } - private void removeJobFromRunningQueue(RollupJobV2 rollupJobV2) { + private void removeJobFromRunningQueue(AlterJobV2 alterJob) { synchronized (tableRunningJobMap) { - Set<Long> runningJobIdSet = tableRunningJobMap.get(rollupJobV2.getTableId()); + Set<Long> runningJobIdSet = tableRunningJobMap.get(alterJob.getTableId()); if (runningJobIdSet != null) { - runningJobIdSet.remove(rollupJobV2.getJobId()); + runningJobIdSet.remove(alterJob.getJobId()); if (runningJobIdSet.size() == 0) { - tableRunningJobMap.remove(rollupJobV2.getTableId()); + tableRunningJobMap.remove(alterJob.getTableId()); } } } @@ -838,16 +838,23 @@ public class MaterializedViewHandler extends AlterHandler { // ATTN(cmy): there is still a short gap between "job finish" and "table become normal", // so if user send next alter job right after the "job finish", // it may encounter "table's state not NORMAL" error. + if (alterJob.isDone()) { - removeJobFromRunningQueue(alterJob); - if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) { - changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); - } + onJobDone(alterJob); continue; } } } + // remove job from running queue and state map, also set table's state to NORMAL if this is + // the last running job of the table. + private void onJobDone(AlterJobV2 alterJob) { + removeJobFromRunningQueue(alterJob); + if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) { + changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); + } + } + @Deprecated private void runOldAlterJob() { List<AlterJob> cancelledJobs = Lists.newArrayList(); @@ -1105,6 +1112,9 @@ public class MaterializedViewHandler extends AlterHandler { if (rollupJobV2List.size() != 0) { for (AlterJobV2 alterJobV2 : rollupJobV2List) { alterJobV2.cancel("user cancelled"); + if (alterJobV2.isDone()) { + onJobDone(alterJobV2); + } } return; } diff --git a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java index a24e307..594860b 100644 --- a/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java +++ b/fe/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java @@ -18,6 +18,7 @@ package org.apache.doris.alter; import org.apache.doris.analysis.AlterTableStmt; +import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.Catalog; @@ -29,30 +30,43 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.util.List; import java.util.Map; import java.util.UUID; public class BatchRollupJobTest { private static String runningDir = "fe/mocked/BatchRollupJobTest/" + UUID.randomUUID().toString() + "/"; + private static ConnectContext ctx; @BeforeClass public static void setup() throws Exception { UtFrameUtils.createMinDorisCluster(runningDir); + ctx = UtFrameUtils.createDefaultCtx(); } - @Test - public void test() throws Exception { - ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + @Before + public void before() throws Exception { + Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + alterJobs.clear(); + // create database db1 - String createDbStmtStr = "create database db1;"; + String createDbStmtStr = "create database if not exists db1;"; CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); Catalog.getCurrentCatalog().createDb(createDbStmt); System.out.println(Catalog.getCurrentCatalog().getDbNames()); + } + + @Test + public void testBatchRollup() throws Exception { // create table tbl1 String createTblStmtStr1 = "create table db1.tbl1(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx); @@ -96,4 +110,52 @@ public class BatchRollupJobTest { Assert.assertEquals(4, partition.getMaterializedIndices(IndexExtState.VISIBLE).size()); } } + + @Test + public void testCancelBatchRollup() throws Exception { + // create table tbl2 + String createTblStmtStr1 = "create table db1.tbl2(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr1, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + + // batch add 3 rollups + String stmtStr = "alter table db1.tbl2 add rollup r1(k1) duplicate key(k1), r2(k1, k2) duplicate key(k1), r3(k2) duplicate key(k2);"; + AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx); + Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); + + Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Assert.assertEquals(3, alterJobs.size()); + List<Long> jobIds = Lists.newArrayList(alterJobs.keySet()); + + Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1"); + Assert.assertNotNull(db); + OlapTable tbl = (OlapTable) db.getTable("tbl2"); + Assert.assertNotNull(tbl); + + for (AlterJobV2 alterJobV2 : alterJobs.values()) { + if (alterJobV2.getType() != AlterJobV2.JobType.ROLLUP) { + continue; + } + while (!alterJobV2.getJobState().isFinalState()) { + System.out.println( + "rollup job " + alterJobV2.getJobId() + " is running. state: " + alterJobV2.getJobState()); + Thread.sleep(5000); + } + System.out.println("rollup job " + alterJobV2.getJobId() + " is done. state: " + alterJobV2.getJobState()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); + + Assert.assertEquals(OlapTableState.ROLLUP, tbl.getState()); + // cancel rest of rollup jobs + stmtStr = "cancel alter table rollup from db1.tbl2 (" + Joiner.on(",").join(jobIds) + ")"; + CancelAlterTableStmt cancelStmt = (CancelAlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx); + Catalog.getCurrentCatalog().cancelAlter(cancelStmt); + + Assert.assertEquals(OlapTableState.NORMAL, tbl.getState()); + break; + } + + for (Partition partition : tbl.getPartitions()) { + Assert.assertEquals(2, partition.getMaterializedIndices(IndexExtState.VISIBLE).size()); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org