This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fa3d6f0ffd8 [feat](binlog) Add replace table binlog #44263 (#44589)
fa3d6f0ffd8 is described below

commit fa3d6f0ffd8632c1dfbfcd8e259583df3de4bb6d
Author: walter <maoch...@selectdb.com>
AuthorDate: Tue Nov 26 17:38:17 2024 +0800

    [feat](binlog) Add replace table binlog #44263 (#44589)
    
    cherry pick from #44263
    
    Co-authored-by: Dongyang Li <lidongy...@selectdb.com>
---
 .../main/java/org/apache/doris/alter/Alter.java    |  2 +-
 .../org/apache/doris/binlog/BinlogManager.java     | 16 +++++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 53 ++++++++++++++++------
 .../java/org/apache/doris/persist/BarrierLog.java  |  4 ++
 .../java/org/apache/doris/persist/EditLog.java     |  7 ++-
 .../doris/persist/ReplaceTableOperationLog.java    | 28 +++++++++++-
 .../persist/ReplaceTableOperationLogTest.java      |  4 +-
 gensrc/thrift/FrontendService.thrift               |  4 +-
 8 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 44b83f80654..b7801cc3ba9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -574,7 +574,7 @@ public class Alter {
                 replaceTableInternal(db, origTable, olapNewTbl, swapTable, 
false);
                 // write edit log
                 ReplaceTableOperationLog log = new 
ReplaceTableOperationLog(db.getId(),
-                        origTable.getId(), olapNewTbl.getId(), swapTable);
+                        origTable.getId(), oldTblName, olapNewTbl.getId(), 
newTblName, swapTable);
                 Env.getCurrentEnv().getEditLog().logReplaceTable(log);
                 LOG.info("finish replacing table {} with table {}, is swap: 
{}", oldTblName, newTblName, swapTable);
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index f07ead08648..7107c6b19ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.ReplacePartitionOperationLog;
+import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TableRenameColumnInfo;
@@ -45,6 +46,7 @@ import org.apache.doris.thrift.TStatusCode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -361,6 +363,20 @@ public class BinlogManager {
         addBarrierLog(log, commitSeq);
     }
 
+    public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) 
{
+        if (StringUtils.isEmpty(info.getOrigTblName()) || 
StringUtils.isEmpty(info.getNewTblName())) {
+            LOG.warn("skip replace table binlog, because origTblName or 
newTblName is empty. info: {}", info);
+            return;
+        }
+
+        long dbId = info.getDbId();
+        long tableId = info.getOrigTblId();
+        TBinlogType type = TBinlogType.REPLACE_TABLE;
+        String data = info.toJson();
+        BarrierLog log = new BarrierLog(dbId, tableId, type, data);
+        addBarrierLog(log, commitSeq);
+    }
+
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index e2eef7966be..c96e994be91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -22,7 +22,9 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
@@ -626,19 +628,29 @@ public class DBBinlog {
 
     // A method to record the dropped tables, indexes, and partitions.
     private void recordDroppedResources(TBinlog binlog, Object raw) {
+        recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), 
binlog.getData(), raw);
+    }
+
+    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, String data, Object raw) {
         if (raw == null) {
-            switch (binlog.getType()) {
+            switch (binlogType) {
                 case DROP_PARTITION:
-                    raw = DropPartitionInfo.fromJson(binlog.data);
+                    raw = DropPartitionInfo.fromJson(data);
                     break;
                 case DROP_TABLE:
-                    raw = DropTableRecord.fromJson(binlog.data);
+                    raw = DropTableRecord.fromJson(data);
                     break;
                 case ALTER_JOB:
-                    raw = AlterJobRecord.fromJson(binlog.data);
+                    raw = AlterJobRecord.fromJson(data);
                     break;
                 case TRUNCATE_TABLE:
-                    raw = TruncateTableRecord.fromJson(binlog.data);
+                    raw = TruncateTableRecord.fromJson(data);
+                    break;
+                case REPLACE_TABLE:
+                    raw = ReplaceTableOperationLog.fromJson(data);
+                    break;
+                case BARRIER:
+                    raw = BarrierLog.fromJson(data);
                     break;
                 default:
                     break;
@@ -648,29 +660,44 @@ public class DBBinlog {
             }
         }
 
-        if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof 
DropPartitionInfo) {
+        recordDroppedResources(binlogType, commitSeq, raw);
+    }
+
+    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, Object raw) {
+        if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof 
DropPartitionInfo) {
             long partitionId = ((DropPartitionInfo) raw).getPartitionId();
             if (partitionId > 0) {
-                droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
+                droppedPartitions.add(Pair.of(partitionId, commitSeq));
             }
-        } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw 
instanceof DropTableRecord) {
+        } else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof 
DropTableRecord) {
             long tableId = ((DropTableRecord) raw).getTableId();
             if (tableId > 0) {
-                droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
+                droppedTables.add(Pair.of(tableId, commitSeq));
             }
-        } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof 
AlterJobRecord) {
+        } else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof 
AlterJobRecord) {
             AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
             if (alterJobRecord.isJobFinished() && 
alterJobRecord.isSchemaChangeJob()) {
                 for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
                     if (indexId != null && indexId > 0) {
-                        droppedIndexes.add(Pair.of(indexId, 
binlog.getCommitSeq()));
+                        droppedIndexes.add(Pair.of(indexId, commitSeq));
                     }
                 }
             }
-        } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw 
instanceof TruncateTableRecord) {
+        } else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof 
TruncateTableRecord) {
             TruncateTableRecord truncateTableRecord = (TruncateTableRecord) 
raw;
             for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
-                droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
+                droppedPartitions.add(Pair.of(partitionId, commitSeq));
+            }
+        } else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof 
ReplaceTableOperationLog) {
+            ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
+            if (!record.isSwapTable()) {
+                droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
+            }
+        } else if (binlogType == TBinlogType.BARRIER && raw instanceof 
BarrierLog) {
+            BarrierLog log = (BarrierLog) raw;
+            // keep compatible with doris 2.0/2.1
+            if (log.hasBinlog()) {
+                recordDroppedResources(log.getBinlogType(), commitSeq, 
log.getBinlog(), null);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 4a9ce13e03b..86d56fb4a64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -109,6 +109,10 @@ public class BarrierLog implements Writable {
         return GsonUtils.GSON.toJson(this);
     }
 
+    public static BarrierLog fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, BarrierLog.class);
+    }
+
     @Override
     public String toString() {
         return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 3c16ba2f15a..572a07eda2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -314,6 +314,7 @@ public class EditLog {
                 case OperationType.OP_RENAME_PARTITION: {
                     TableInfo info = (TableInfo) journal.getData();
                     env.replayRenamePartition(info);
+                    env.getBinlogManager().addTableRename(info, logId);
                     break;
                 }
                 case OperationType.OP_RENAME_COLUMN: {
@@ -361,6 +362,7 @@ public class EditLog {
                 case OperationType.OP_RENAME_ROLLUP: {
                     TableInfo info = (TableInfo) journal.getData();
                     env.replayRenameRollup(info);
+                    env.getBinlogManager().addTableRename(info, logId);
                     break;
                 }
                 case OperationType.OP_LOAD_START:
@@ -887,6 +889,7 @@ public class EditLog {
                 case OperationType.OP_REPLACE_TABLE: {
                     ReplaceTableOperationLog log = (ReplaceTableOperationLog) 
journal.getData();
                     env.getAlterInstance().replayReplaceTable(log);
+                    env.getBinlogManager().addReplaceTable(log, logId);
                     break;
                 }
                 case OperationType.OP_CREATE_SQL_BLOCK_RULE: {
@@ -1887,7 +1890,9 @@ public class EditLog {
     }
 
     public void logReplaceTable(ReplaceTableOperationLog log) {
-        logEdit(OperationType.OP_REPLACE_TABLE, log);
+        long logId = logEdit(OperationType.OP_REPLACE_TABLE, log);
+        LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log);
+        Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId);
     }
 
     public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 
op) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
index c5b0a05f0e6..3d11de4af43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
@@ -32,15 +32,23 @@ public class ReplaceTableOperationLog implements Writable {
     private long dbId;
     @SerializedName(value = "origTblId")
     private long origTblId;
+    @SerializedName(value = "origTblName")
+    private String origTblName;
     @SerializedName(value = "newTblName")
     private long newTblId;
+    @SerializedName(value = "actualNewTblName")
+    private String newTblName;
     @SerializedName(value = "swapTable")
     private boolean swapTable;
 
-    public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId, 
boolean swapTable) {
+    public ReplaceTableOperationLog(long dbId, long origTblId,
+            String origTblName, long newTblId, String newTblName,
+            boolean swapTable) {
         this.dbId = dbId;
         this.origTblId = origTblId;
+        this.origTblName = origTblName;
         this.newTblId = newTblId;
+        this.newTblName = newTblName;
         this.swapTable = swapTable;
     }
 
@@ -52,21 +60,37 @@ public class ReplaceTableOperationLog implements Writable {
         return origTblId;
     }
 
+    public String getOrigTblName() {
+        return origTblName;
+    }
+
     public long getNewTblId() {
         return newTblId;
     }
 
+    public String getNewTblName() {
+        return newTblName;
+    }
+
     public boolean isSwapTable() {
         return swapTable;
     }
 
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    public static ReplaceTableOperationLog fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
         Text.writeString(out, json);
     }
 
-    public static ReplaceTableOperationLog read(DataInput in) throws  
IOException {
+    public static ReplaceTableOperationLog read(DataInput in) throws 
IOException {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
index ec71c24a13f..dddcacb48a3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
@@ -34,7 +34,7 @@ public class ReplaceTableOperationLogTest {
         file.createNewFile();
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
 
-        ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3, 
true);
+        ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 
"old", 3, "new", true);
         log.write(dos);
 
         dos.flush();
@@ -48,6 +48,8 @@ public class ReplaceTableOperationLogTest {
         Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId());
         Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId());
         Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable());
+        
Assert.assertTrue(readLog.getOrigTblName().equals(log.getOrigTblName()));
+        Assert.assertTrue(readLog.getNewTblName().equals(log.getNewTblName()));
 
         // 3. delete files
         dis.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index a36eb8e0de7..6c8ad296724 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1141,6 +1141,7 @@ enum TBinlogType {
   RENAME_COLUMN = 15,
   MODIFY_COMMENT = 16,
   MODIFY_VIEW_DEF = 17,
+  REPLACE_TABLE = 18,
 
   // Keep some IDs for allocation so that when new binlog types are added in 
the
   // future, the changes can be picked back to the old versions without 
breaking
@@ -1157,8 +1158,7 @@ enum TBinlogType {
   //    MODIFY_XXX = 17,
   //    MIN_UNKNOWN = 18,
   //    UNKNOWN_3 = 19,
-  MIN_UNKNOWN = 18,
-  UNKNOWN_3 = 19,
+  MIN_UNKNOWN = 19,
   UNKNOWN_4 = 20,
   UNKNOWN_5 = 21,
   UNKNOWN_6 = 22,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to