This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 956c2357710 branch-3.0: [feat](binlog) Support drop rollup binlog #44677 (#44714) 956c2357710 is described below commit 956c235771084b2f7ae14997f456e131d5d02491 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Nov 28 17:47:17 2024 +0800 branch-3.0: [feat](binlog) Support drop rollup binlog #44677 (#44714) Cherry-picked from #44677 Co-authored-by: walter <maoch...@selectdb.com> --- .../doris/alter/MaterializedViewHandler.java | 16 ++++++------- .../org/apache/doris/binlog/BinlogManager.java | 17 ++++++++++++++ .../java/org/apache/doris/binlog/DBBinlog.java | 9 ++++++++ .../apache/doris/datasource/InternalCatalog.java | 4 ++-- .../org/apache/doris/persist/BatchDropInfo.java | 12 ++++++++-- .../java/org/apache/doris/persist/DropInfo.java | 20 ++++++++++++++-- .../java/org/apache/doris/persist/EditLog.java | 27 ++++++++++++++++------ .../doris/persist/DropAndRecoverInfoTest.java | 10 ++++---- gensrc/thrift/FrontendService.thrift | 4 ++-- 9 files changed, 90 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index d02e91a379f..a6f1cae9987 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -944,14 +944,12 @@ public class MaterializedViewHandler extends AlterHandler { } // drop data in memory - Set<Long> indexIdSet = new HashSet<>(); - Set<String> rollupNameSet = new HashSet<>(); + Map<Long, String> rollupNameMap = new HashMap<>(); for (AlterClause alterClause : dropRollupClauses) { DropRollupClause dropRollupClause = (DropRollupClause) alterClause; String rollupIndexName = dropRollupClause.getRollupName(); long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable); - indexIdSet.add(rollupIndexId); - rollupNameSet.add(rollupIndexName); + rollupNameMap.put(rollupIndexId, rollupIndexName); } // batch log drop rollup operation @@ -959,10 +957,10 @@ public class MaterializedViewHandler extends AlterHandler { long dbId = db.getId(); long tableId = olapTable.getId(); String tableName = olapTable.getName(); - editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet)); - deleteIndexList = indexIdSet.stream().collect(Collectors.toList()); + editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, rollupNameMap)); + deleteIndexList = rollupNameMap.keySet().stream().collect(Collectors.toList()); LOG.info("finished drop rollup index[{}] in table[{}]", - String.join("", rollupNameSet), olapTable.getName()); + String.join("", rollupNameMap.values()), olapTable.getName()); } finally { olapTable.writeUnlock(); } @@ -982,8 +980,8 @@ public class MaterializedViewHandler extends AlterHandler { long mvIndexId = dropMaterializedView(mvName, olapTable); // Step3: log drop mv operation EditLog editLog = Env.getCurrentEnv().getEditLog(); - editLog.logDropRollup( - new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, false, 0)); + editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), + mvIndexId, mvName, false, false, 0)); deleteIndexList.add(mvIndexId); LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName()); } catch (MetaNotFoundException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 3a033c98103..67bb99a8bcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -30,6 +30,7 @@ import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyCommentOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; @@ -429,6 +430,22 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob); } + public void addDropRollup(DropInfo info, long commitSeq) { + if (StringUtils.isEmpty(info.getIndexName())) { + LOG.warn("skip drop rollup binlog, because indexName is empty. info: {}", info); + return; + } + + long dbId = info.getDbId(); + List<Long> tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.DROP_ROLLUP; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + // get binlog by dbId, return first binlog.version > version public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index c96e994be91..b78ed389a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.thrift.TBinlog; @@ -649,6 +650,9 @@ public class DBBinlog { case REPLACE_TABLE: raw = ReplaceTableOperationLog.fromJson(data); break; + case DROP_ROLLUP: + raw = DropInfo.fromJson(data); + break; case BARRIER: raw = BarrierLog.fromJson(data); break; @@ -693,6 +697,11 @@ public class DBBinlog { if (!record.isSwapTable()) { droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq)); } + } else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof DropInfo) { + long indexId = ((DropInfo) raw).getIndexId(); + if (indexId > 0) { + droppedIndexes.add(Pair.of(indexId, commitSeq)); + } } else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) { BarrierLog log = (BarrierLog) raw; // keep compatible with doris 2.0/2.1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index c7877c925db..53641d70c3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1010,7 +1010,7 @@ public class InternalCatalog implements CatalogIf<Database> { Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId(), table.getId()); - DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, isView, forceDrop, recycleTime); + DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, isView, forceDrop, recycleTime); Env.getCurrentEnv().getEditLog().logDropTable(info); Env.getCurrentEnv().getMtmvService().dropTable(table); } @@ -3231,7 +3231,7 @@ public class InternalCatalog implements CatalogIf<Database> { try { dropTable(db, tableId, true, false, 0L); if (hadLogEditCreateTable) { - DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, false, true, 0L); + DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), false, true, 0L); Env.getCurrentEnv().getEditLog().logDropTable(info); } } catch (Exception ex) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java index fdfc44e27bb..260ad316d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java @@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexIdSet") private Set<Long> indexIdSet; + @SerializedName(value = "indexNameMap") + private Map<Long, String> indexNameMap; // not used in equals and hashCode - public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet) { + public BatchDropInfo(long dbId, long tableId, String tableName, Map<Long, String> indexNameMap) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; - this.indexIdSet = indexIdSet; + this.indexIdSet = indexNameMap.keySet(); + this.indexNameMap = indexNameMap; } @Override @@ -82,6 +86,10 @@ public class BatchDropInfo implements Writable { return indexIdSet; } + public Map<Long, String> getIndexNameMap() { + return indexNameMap; + } + public long getDbId() { return dbId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java index 461f3ddd67d..69994caf23d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java @@ -38,6 +38,8 @@ public class DropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexId") private long indexId; + @SerializedName(value = "indexName") + private String indexName; // not used in equals and hashCode @SerializedName(value = "isView") private boolean isView = false; @SerializedName(value = "forceDrop") @@ -48,12 +50,18 @@ public class DropInfo implements Writable { public DropInfo() { } - public DropInfo(long dbId, long tableId, String tableName, long indexId, boolean isView, boolean forceDrop, - long recycleTime) { + public DropInfo(long dbId, long tableId, String tableName, boolean isView, boolean forceDrop, + long recycleTime) { + this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime); + } + + public DropInfo(long dbId, long tableId, String tableName, long indexId, String indexName, boolean isView, + boolean forceDrop, long recycleTime) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; this.indexId = indexId; + this.indexName = indexName; this.isView = isView; this.forceDrop = forceDrop; this.recycleTime = recycleTime; @@ -75,6 +83,10 @@ public class DropInfo implements Writable { return this.indexId; } + public String getIndexName() { + return this.indexName; + } + public boolean isView() { return this.isView; } @@ -133,4 +145,8 @@ public class DropInfo implements Writable { public String toJson() { return GsonUtils.GSON.toJson(this); } + + public static DropInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, DropInfo.class); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 1e70eb634b2..7d1f2127eec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -105,6 +105,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.List; +import java.util.Map; /** * EditLog maintains a log of the memory modifications. @@ -341,15 +342,18 @@ public class EditLog { case OperationType.OP_DROP_ROLLUP: { DropInfo info = (DropInfo) journal.getData(); env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); break; } case OperationType.OP_BATCH_DROP_ROLLUP: { BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData(); - for (long indexId : batchDropInfo.getIndexIdSet()) { - env.getMaterializedViewHandler().replayDropRollup( - new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), - batchDropInfo.getTableName(), indexId, false, false, 0), - env); + for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) { + long indexId = entry.getKey(); + String indexName = entry.getValue(); + DropInfo info = new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), + batchDropInfo.getTableName(), indexId, indexName, false, false, 0); + env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); } break; } @@ -1463,11 +1467,20 @@ public class EditLog { } public void logDropRollup(DropInfo info) { - logEdit(OperationType.OP_DROP_ROLLUP, info); + long logId = logEdit(OperationType.OP_DROP_ROLLUP, info); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); } public void logBatchDropRollup(BatchDropInfo batchDropInfo) { - logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) { + DropInfo info = new DropInfo(batchDropInfo.getDbId(), + batchDropInfo.getTableId(), + batchDropInfo.getTableName(), + entry.getKey(), entry.getValue(), + false, true, 0); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); + } } public void logFinishConsistencyCheck(ConsistencyCheckInfo info) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java index 88aa22ded22..8c74fba2753 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java @@ -44,7 +44,7 @@ public class DropAndRecoverInfoTest { DropInfo info1 = new DropInfo(); info1.write(dos); - DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0); + DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0); info2.write(dos); dos.flush(); @@ -65,10 +65,10 @@ public class DropAndRecoverInfoTest { Assert.assertEquals(rInfo2, rInfo2); Assert.assertNotEquals(rInfo2, this); - Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false, false, 0)); - Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, false, 0)); + Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, true, 0)); // 3. delete files dis.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d06926e0760..3fb7f6a6515 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1195,6 +1195,7 @@ enum TBinlogType { INDEX_CHANGE_JOB = 20, RENAME_ROLLUP = 21, RENAME_PARTITION = 22, + DROP_ROLLUP = 23, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1211,8 +1212,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 23, - UNKNOWN_8 = 24, + MIN_UNKNOWN = 24, UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org