This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fc942c435f7 [feat](binlog) Support drop rollup binlog #44677 (#44743)
fc942c435f7 is described below

commit fc942c435f7c3b13347a9404bf3f1e41a4206f49
Author: walter <maoch...@selectdb.com>
AuthorDate: Fri Nov 29 11:30:37 2024 +0800

    [feat](binlog) Support drop rollup binlog #44677 (#44743)
    
    cherry pick from #44677
---
 .../doris/alter/MaterializedViewHandler.java       | 14 +++++------
 .../org/apache/doris/binlog/BinlogManager.java     | 29 +++++++++++++++-------
 .../java/org/apache/doris/binlog/DBBinlog.java     |  9 +++++++
 .../apache/doris/datasource/InternalCatalog.java   |  4 +--
 .../org/apache/doris/persist/BatchDropInfo.java    | 12 +++++++--
 .../java/org/apache/doris/persist/DropInfo.java    | 20 +++++++++++++--
 .../java/org/apache/doris/persist/EditLog.java     | 27 ++++++++++++++------
 .../doris/persist/DropAndRecoverInfoTest.java      | 10 ++++----
 gensrc/thrift/FrontendService.thrift               |  4 +--
 9 files changed, 92 insertions(+), 37 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 1ed38748e18..b50bec9cb02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -915,14 +915,12 @@ public class MaterializedViewHandler extends AlterHandler 
{
             }
 
             // drop data in memory
-            Set<Long> indexIdSet = new HashSet<>();
-            Set<String> rollupNameSet = new HashSet<>();
+            Map<Long, String> rollupNameMap = new HashMap<>();
             for (AlterClause alterClause : dropRollupClauses) {
                 DropRollupClause dropRollupClause = (DropRollupClause) 
alterClause;
                 String rollupIndexName = dropRollupClause.getRollupName();
                 long rollupIndexId = dropMaterializedView(rollupIndexName, 
olapTable);
-                indexIdSet.add(rollupIndexId);
-                rollupNameSet.add(rollupIndexName);
+                rollupNameMap.put(rollupIndexId, rollupIndexName);
             }
 
             // batch log drop rollup operation
@@ -930,9 +928,9 @@ public class MaterializedViewHandler extends AlterHandler {
             long dbId = db.getId();
             long tableId = olapTable.getId();
             String tableName = olapTable.getName();
-            editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, 
tableName, indexIdSet));
+            editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, 
tableName, rollupNameMap));
             LOG.info("finished drop rollup index[{}] in table[{}]",
-                    String.join("", rollupNameSet), olapTable.getName());
+                    String.join("", rollupNameMap.values()), 
olapTable.getName());
         } finally {
             olapTable.writeUnlock();
         }
@@ -950,8 +948,8 @@ public class MaterializedViewHandler extends AlterHandler {
             long mvIndexId = dropMaterializedView(mvName, olapTable);
             // Step3: log drop mv operation
             EditLog editLog = Env.getCurrentEnv().getEditLog();
-            editLog.logDropRollup(
-                    new DropInfo(db.getId(), olapTable.getId(), 
olapTable.getName(), mvIndexId, false, false, 0));
+            editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), 
olapTable.getName(),
+                    mvIndexId, mvName, false, false, 0));
             LOG.info("finished drop materialized view [{}] in table [{}]", 
mvName, olapTable.getName());
         } catch (MetaNotFoundException e) {
             if (dropMaterializedViewStmt.isIfExists()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 8886c4b4104..5606ceeffa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -30,6 +30,7 @@ import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@@ -399,24 +400,34 @@ public class BinlogManager {
 
     public void 
addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, 
long commitSeq) {
         long dbId = info.getDbId();
-        List<Long> tableIds = Lists.newArrayList();
-        tableIds.add(info.getTableId());
-        long timestamp = -1;
+        long tableId = info.getTableId();
         TBinlogType type = 
TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
         String data = info.toJson();
-
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
info);
+        BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+        addBarrierLog(log, commitSeq);
     }
 
     public void addIndexChangeJob(IndexChangeJob indexChangeJob, long 
commitSeq) {
         long dbId = indexChangeJob.getDbId();
-        List<Long> tableIds = Lists.newArrayList();
-        tableIds.add(indexChangeJob.getTableId());
-        long timestamp = -1;
+        long tableId = indexChangeJob.getTableId();
         TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
         String data = indexChangeJob.toJson();
+        BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+        addBarrierLog(log, commitSeq);
+    }
+
+    public void addDropRollup(DropInfo info, long commitSeq) {
+        if (StringUtils.isEmpty(info.getIndexName())) {
+            LOG.warn("skip drop rollup binlog, because indexName is empty. 
info: {}", info);
+            return;
+        }
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
indexChangeJob);
+        long dbId = info.getDbId();
+        long tableId = info.getTableId();
+        TBinlogType type = TBinlogType.DROP_ROLLUP;
+        String data = info.toJson();
+        BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+        addBarrierLog(log, commitSeq);
     }
 
     // get binlog by dbId, return first binlog.version > version
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index c96e994be91..b78ed389a0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.BarrierLog;
+import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.thrift.TBinlog;
@@ -649,6 +650,9 @@ public class DBBinlog {
                 case REPLACE_TABLE:
                     raw = ReplaceTableOperationLog.fromJson(data);
                     break;
+                case DROP_ROLLUP:
+                    raw = DropInfo.fromJson(data);
+                    break;
                 case BARRIER:
                     raw = BarrierLog.fromJson(data);
                     break;
@@ -693,6 +697,11 @@ public class DBBinlog {
             if (!record.isSwapTable()) {
                 droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
             }
+        } else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof 
DropInfo) {
+            long indexId = ((DropInfo) raw).getIndexId();
+            if (indexId > 0) {
+                droppedIndexes.add(Pair.of(indexId, commitSeq));
+            }
         } else if (binlogType == TBinlogType.BARRIER && raw instanceof 
BarrierLog) {
             BarrierLog log = (BarrierLog) raw;
             // keep compatible with doris 2.0/2.1
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 11ada7df69f..aab70f86733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -972,7 +972,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
 
         
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
                 db.getId(), table.getId());
-        DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, 
-1L, isView, forceDrop, recycleTime);
+        DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, 
isView, forceDrop, recycleTime);
         Env.getCurrentEnv().getEditLog().logDropTable(info);
         Env.getCurrentEnv().getMtmvService().dropTable(table);
     }
@@ -2989,7 +2989,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             try {
                 dropTable(db, tableId, true, false, 0L);
                 if (hadLogEditCreateTable) {
-                    DropInfo info = new DropInfo(db.getId(), tableId, 
olapTable.getName(), -1L, false, true, 0L);
+                    DropInfo info = new DropInfo(db.getId(), tableId, 
olapTable.getName(), false, true, 0L);
                     Env.getCurrentEnv().getEditLog().logDropTable(info);
                 }
             } catch (Exception ex) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
index fdfc44e27bb..260ad316d3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java
@@ -26,6 +26,7 @@ import com.google.gson.annotations.SerializedName;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable {
     private String tableName; // not used in equals and hashCode
     @SerializedName(value = "indexIdSet")
     private Set<Long> indexIdSet;
+    @SerializedName(value = "indexNameMap")
+    private Map<Long, String> indexNameMap; // not used in equals and hashCode
 
-    public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> 
indexIdSet) {
+    public BatchDropInfo(long dbId, long tableId, String tableName, Map<Long, 
String> indexNameMap) {
         this.dbId = dbId;
         this.tableId = tableId;
         this.tableName = tableName;
-        this.indexIdSet = indexIdSet;
+        this.indexIdSet = indexNameMap.keySet();
+        this.indexNameMap = indexNameMap;
     }
 
     @Override
@@ -82,6 +86,10 @@ public class BatchDropInfo implements Writable {
         return indexIdSet;
     }
 
+    public Map<Long, String> getIndexNameMap() {
+        return indexNameMap;
+    }
+
     public long getDbId() {
         return dbId;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
index 461f3ddd67d..69994caf23d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
@@ -38,6 +38,8 @@ public class DropInfo implements Writable {
     private String tableName; // not used in equals and hashCode
     @SerializedName(value = "indexId")
     private long indexId;
+    @SerializedName(value = "indexName")
+    private String indexName; // not used in equals and hashCode
     @SerializedName(value = "isView")
     private boolean isView = false;
     @SerializedName(value = "forceDrop")
@@ -48,12 +50,18 @@ public class DropInfo implements Writable {
     public DropInfo() {
     }
 
-    public DropInfo(long dbId, long tableId, String tableName, long indexId, 
boolean isView, boolean forceDrop,
-                    long recycleTime) {
+    public DropInfo(long dbId, long tableId, String tableName, boolean isView, 
boolean forceDrop,
+            long recycleTime) {
+        this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime);
+    }
+
+    public DropInfo(long dbId, long tableId, String tableName, long indexId, 
String indexName, boolean isView,
+            boolean forceDrop, long recycleTime) {
         this.dbId = dbId;
         this.tableId = tableId;
         this.tableName = tableName;
         this.indexId = indexId;
+        this.indexName = indexName;
         this.isView = isView;
         this.forceDrop = forceDrop;
         this.recycleTime = recycleTime;
@@ -75,6 +83,10 @@ public class DropInfo implements Writable {
         return this.indexId;
     }
 
+    public String getIndexName() {
+        return this.indexName;
+    }
+
     public boolean isView() {
         return this.isView;
     }
@@ -133,4 +145,8 @@ public class DropInfo implements Writable {
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
     }
+
+    public static DropInfo fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, DropInfo.class);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 261331ac863..3ac992e6edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -101,6 +101,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * EditLog maintains a log of the memory modifications.
@@ -337,15 +338,18 @@ public class EditLog {
                 case OperationType.OP_DROP_ROLLUP: {
                     DropInfo info = (DropInfo) journal.getData();
                     env.getMaterializedViewHandler().replayDropRollup(info, 
env);
+                    env.getBinlogManager().addDropRollup(info, logId);
                     break;
                 }
                 case OperationType.OP_BATCH_DROP_ROLLUP: {
                     BatchDropInfo batchDropInfo = (BatchDropInfo) 
journal.getData();
-                    for (long indexId : batchDropInfo.getIndexIdSet()) {
-                        env.getMaterializedViewHandler().replayDropRollup(
-                                new DropInfo(batchDropInfo.getDbId(), 
batchDropInfo.getTableId(),
-                                        batchDropInfo.getTableName(), indexId, 
false, false, 0),
-                                env);
+                    for (Map.Entry<Long, String> entry : 
batchDropInfo.getIndexNameMap().entrySet()) {
+                        long indexId = entry.getKey();
+                        String indexName = entry.getValue();
+                        DropInfo info = new DropInfo(batchDropInfo.getDbId(), 
batchDropInfo.getTableId(),
+                                        batchDropInfo.getTableName(), indexId, 
indexName, false, false, 0);
+                        
env.getMaterializedViewHandler().replayDropRollup(info, env);
+                        env.getBinlogManager().addDropRollup(info, logId);
                     }
                     break;
                 }
@@ -1418,11 +1422,20 @@ public class EditLog {
     }
 
     public void logDropRollup(DropInfo info) {
-        logEdit(OperationType.OP_DROP_ROLLUP, info);
+        long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
+        Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
     }
 
     public void logBatchDropRollup(BatchDropInfo batchDropInfo) {
-        logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
+        long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP, 
batchDropInfo);
+        for (Map.Entry<Long, String> entry : 
batchDropInfo.getIndexNameMap().entrySet()) {
+            DropInfo info = new DropInfo(batchDropInfo.getDbId(),
+                    batchDropInfo.getTableId(),
+                    batchDropInfo.getTableName(),
+                    entry.getKey(), entry.getValue(),
+                    false, true, 0);
+            Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
+        }
     }
 
     public void logFinishConsistencyCheck(ConsistencyCheckInfo info) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
index 88aa22ded22..8c74fba2753 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
@@ -44,7 +44,7 @@ public class DropAndRecoverInfoTest {
         DropInfo info1 = new DropInfo();
         info1.write(dos);
 
-        DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0);
+        DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0);
         info2.write(dos);
 
         dos.flush();
@@ -65,10 +65,10 @@ public class DropAndRecoverInfoTest {
 
         Assert.assertEquals(rInfo2, rInfo2);
         Assert.assertNotEquals(rInfo2, this);
-        Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false, 
true, 0));
-        Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false, 
true, 0));
-        Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false, 
false, 0));
-        Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true, 
0));
+        Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false, 
true, 0));
+        Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false, 
true, 0));
+        Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, 
false, 0));
+        Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, 
true, 0));
 
         // 3. delete files
         dis.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 95f41158e94..f8edb5d544b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1146,6 +1146,7 @@ enum TBinlogType {
   INDEX_CHANGE_JOB = 20,
   RENAME_ROLLUP = 21,
   RENAME_PARTITION = 22,
+  DROP_ROLLUP = 23,
 
   // Keep some IDs for allocation so that when new binlog types are added in 
the
   // future, the changes can be picked back to the old versions without 
breaking
@@ -1162,8 +1163,7 @@ enum TBinlogType {
   //    MODIFY_XXX = 17,
   //    MIN_UNKNOWN = 18,
   //    UNKNOWN_3 = 19,
-  MIN_UNKNOWN = 23,
-  UNKNOWN_8 = 24,
+  MIN_UNKNOWN = 24,
   UNKNOWN_9 = 25,
   UNKNOWN_10 = 26,
   UNKNOWN_11 = 27,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to