This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new fc942c435f7 [feat](binlog) Support drop rollup binlog #44677 (#44743) fc942c435f7 is described below commit fc942c435f7c3b13347a9404bf3f1e41a4206f49 Author: walter <maoch...@selectdb.com> AuthorDate: Fri Nov 29 11:30:37 2024 +0800 [feat](binlog) Support drop rollup binlog #44677 (#44743) cherry pick from #44677 --- .../doris/alter/MaterializedViewHandler.java | 14 +++++------ .../org/apache/doris/binlog/BinlogManager.java | 29 +++++++++++++++------- .../java/org/apache/doris/binlog/DBBinlog.java | 9 +++++++ .../apache/doris/datasource/InternalCatalog.java | 4 +-- .../org/apache/doris/persist/BatchDropInfo.java | 12 +++++++-- .../java/org/apache/doris/persist/DropInfo.java | 20 +++++++++++++-- .../java/org/apache/doris/persist/EditLog.java | 27 ++++++++++++++------ .../doris/persist/DropAndRecoverInfoTest.java | 10 ++++---- gensrc/thrift/FrontendService.thrift | 4 +-- 9 files changed, 92 insertions(+), 37 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 1ed38748e18..b50bec9cb02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -915,14 +915,12 @@ public class MaterializedViewHandler extends AlterHandler { } // drop data in memory - Set<Long> indexIdSet = new HashSet<>(); - Set<String> rollupNameSet = new HashSet<>(); + Map<Long, String> rollupNameMap = new HashMap<>(); for (AlterClause alterClause : dropRollupClauses) { DropRollupClause dropRollupClause = (DropRollupClause) alterClause; String rollupIndexName = dropRollupClause.getRollupName(); long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable); - indexIdSet.add(rollupIndexId); - rollupNameSet.add(rollupIndexName); + rollupNameMap.put(rollupIndexId, rollupIndexName); } // batch log drop rollup operation @@ -930,9 +928,9 @@ public class MaterializedViewHandler extends AlterHandler { long dbId = db.getId(); long tableId = olapTable.getId(); String tableName = olapTable.getName(); - editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet)); + editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, rollupNameMap)); LOG.info("finished drop rollup index[{}] in table[{}]", - String.join("", rollupNameSet), olapTable.getName()); + String.join("", rollupNameMap.values()), olapTable.getName()); } finally { olapTable.writeUnlock(); } @@ -950,8 +948,8 @@ public class MaterializedViewHandler extends AlterHandler { long mvIndexId = dropMaterializedView(mvName, olapTable); // Step3: log drop mv operation EditLog editLog = Env.getCurrentEnv().getEditLog(); - editLog.logDropRollup( - new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, false, 0)); + editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), + mvIndexId, mvName, false, false, 0)); LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName()); } catch (MetaNotFoundException e) { if (dropMaterializedViewStmt.isIfExists()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 8886c4b4104..5606ceeffa2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -30,6 +30,7 @@ import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyCommentOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; @@ -399,24 +400,34 @@ public class BinlogManager { public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) { long dbId = info.getDbId(); - List<Long> tableIds = Lists.newArrayList(); - tableIds.add(info.getTableId()); - long timestamp = -1; + long tableId = info.getTableId(); TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES; String data = info.toJson(); - - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + BarrierLog log = new BarrierLog(dbId, tableId, type, data); + addBarrierLog(log, commitSeq); } public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) { long dbId = indexChangeJob.getDbId(); - List<Long> tableIds = Lists.newArrayList(); - tableIds.add(indexChangeJob.getTableId()); - long timestamp = -1; + long tableId = indexChangeJob.getTableId(); TBinlogType type = TBinlogType.INDEX_CHANGE_JOB; String data = indexChangeJob.toJson(); + BarrierLog log = new BarrierLog(dbId, tableId, type, data); + addBarrierLog(log, commitSeq); + } + + public void addDropRollup(DropInfo info, long commitSeq) { + if (StringUtils.isEmpty(info.getIndexName())) { + LOG.warn("skip drop rollup binlog, because indexName is empty. info: {}", info); + return; + } - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob); + long dbId = info.getDbId(); + long tableId = info.getTableId(); + TBinlogType type = TBinlogType.DROP_ROLLUP; + String data = info.toJson(); + BarrierLog log = new BarrierLog(dbId, tableId, type, data); + addBarrierLog(log, commitSeq); } // get binlog by dbId, return first binlog.version > version diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index c96e994be91..b78ed389a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.thrift.TBinlog; @@ -649,6 +650,9 @@ public class DBBinlog { case REPLACE_TABLE: raw = ReplaceTableOperationLog.fromJson(data); break; + case DROP_ROLLUP: + raw = DropInfo.fromJson(data); + break; case BARRIER: raw = BarrierLog.fromJson(data); break; @@ -693,6 +697,11 @@ public class DBBinlog { if (!record.isSwapTable()) { droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq)); } + } else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof DropInfo) { + long indexId = ((DropInfo) raw).getIndexId(); + if (indexId > 0) { + droppedIndexes.add(Pair.of(indexId, commitSeq)); + } } else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) { BarrierLog log = (BarrierLog) raw; // keep compatible with doris 2.0/2.1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 11ada7df69f..aab70f86733 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -972,7 +972,7 @@ public class InternalCatalog implements CatalogIf<Database> { Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId(), table.getId()); - DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, isView, forceDrop, recycleTime); + DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, isView, forceDrop, recycleTime); Env.getCurrentEnv().getEditLog().logDropTable(info); Env.getCurrentEnv().getMtmvService().dropTable(table); } @@ -2989,7 +2989,7 @@ public class InternalCatalog implements CatalogIf<Database> { try { dropTable(db, tableId, true, false, 0L); if (hadLogEditCreateTable) { - DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, false, true, 0L); + DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), false, true, 0L); Env.getCurrentEnv().getEditLog().logDropTable(info); } } catch (Exception ex) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java index fdfc44e27bb..260ad316d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java @@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexIdSet") private Set<Long> indexIdSet; + @SerializedName(value = "indexNameMap") + private Map<Long, String> indexNameMap; // not used in equals and hashCode - public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet) { + public BatchDropInfo(long dbId, long tableId, String tableName, Map<Long, String> indexNameMap) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; - this.indexIdSet = indexIdSet; + this.indexIdSet = indexNameMap.keySet(); + this.indexNameMap = indexNameMap; } @Override @@ -82,6 +86,10 @@ public class BatchDropInfo implements Writable { return indexIdSet; } + public Map<Long, String> getIndexNameMap() { + return indexNameMap; + } + public long getDbId() { return dbId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java index 461f3ddd67d..69994caf23d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java @@ -38,6 +38,8 @@ public class DropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexId") private long indexId; + @SerializedName(value = "indexName") + private String indexName; // not used in equals and hashCode @SerializedName(value = "isView") private boolean isView = false; @SerializedName(value = "forceDrop") @@ -48,12 +50,18 @@ public class DropInfo implements Writable { public DropInfo() { } - public DropInfo(long dbId, long tableId, String tableName, long indexId, boolean isView, boolean forceDrop, - long recycleTime) { + public DropInfo(long dbId, long tableId, String tableName, boolean isView, boolean forceDrop, + long recycleTime) { + this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime); + } + + public DropInfo(long dbId, long tableId, String tableName, long indexId, String indexName, boolean isView, + boolean forceDrop, long recycleTime) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; this.indexId = indexId; + this.indexName = indexName; this.isView = isView; this.forceDrop = forceDrop; this.recycleTime = recycleTime; @@ -75,6 +83,10 @@ public class DropInfo implements Writable { return this.indexId; } + public String getIndexName() { + return this.indexName; + } + public boolean isView() { return this.isView; } @@ -133,4 +145,8 @@ public class DropInfo implements Writable { public String toJson() { return GsonUtils.GSON.toJson(this); } + + public static DropInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, DropInfo.class); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 261331ac863..3ac992e6edf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -101,6 +101,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.List; +import java.util.Map; /** * EditLog maintains a log of the memory modifications. @@ -337,15 +338,18 @@ public class EditLog { case OperationType.OP_DROP_ROLLUP: { DropInfo info = (DropInfo) journal.getData(); env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); break; } case OperationType.OP_BATCH_DROP_ROLLUP: { BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData(); - for (long indexId : batchDropInfo.getIndexIdSet()) { - env.getMaterializedViewHandler().replayDropRollup( - new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), - batchDropInfo.getTableName(), indexId, false, false, 0), - env); + for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) { + long indexId = entry.getKey(); + String indexName = entry.getValue(); + DropInfo info = new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), + batchDropInfo.getTableName(), indexId, indexName, false, false, 0); + env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); } break; } @@ -1418,11 +1422,20 @@ public class EditLog { } public void logDropRollup(DropInfo info) { - logEdit(OperationType.OP_DROP_ROLLUP, info); + long logId = logEdit(OperationType.OP_DROP_ROLLUP, info); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); } public void logBatchDropRollup(BatchDropInfo batchDropInfo) { - logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) { + DropInfo info = new DropInfo(batchDropInfo.getDbId(), + batchDropInfo.getTableId(), + batchDropInfo.getTableName(), + entry.getKey(), entry.getValue(), + false, true, 0); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); + } } public void logFinishConsistencyCheck(ConsistencyCheckInfo info) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java index 88aa22ded22..8c74fba2753 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java @@ -44,7 +44,7 @@ public class DropAndRecoverInfoTest { DropInfo info1 = new DropInfo(); info1.write(dos); - DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0); + DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0); info2.write(dos); dos.flush(); @@ -65,10 +65,10 @@ public class DropAndRecoverInfoTest { Assert.assertEquals(rInfo2, rInfo2); Assert.assertNotEquals(rInfo2, this); - Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false, false, 0)); - Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, false, 0)); + Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, true, 0)); // 3. delete files dis.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 95f41158e94..f8edb5d544b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1146,6 +1146,7 @@ enum TBinlogType { INDEX_CHANGE_JOB = 20, RENAME_ROLLUP = 21, RENAME_PARTITION = 22, + DROP_ROLLUP = 23, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1162,8 +1163,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 23, - UNKNOWN_8 = 24, + MIN_UNKNOWN = 24, UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org