This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ef6258321cf [feat](binlog) Add replace table binlog #44263 (#44518)
ef6258321cf is described below

commit ef6258321cf5ecaa4bc37016664fa26bafb1113e
Author: walter <maoch...@selectdb.com>
AuthorDate: Mon Nov 25 23:34:16 2024 +0800

    [feat](binlog) Add replace table binlog #44263 (#44518)
    
    cherry pick from #44263
---
 .../main/java/org/apache/doris/alter/Alter.java    |  2 +-
 .../org/apache/doris/binlog/BinlogManager.java     | 18 ++++++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 53 ++++++++++++++++------
 .../java/org/apache/doris/persist/BarrierLog.java  |  4 ++
 .../java/org/apache/doris/persist/EditLog.java     | 11 +++--
 .../doris/persist/ReplaceTableOperationLog.java    | 28 +++++++++++-
 .../persist/ReplaceTableOperationLogTest.java      |  4 +-
 gensrc/thrift/FrontendService.thrift               |  4 +-
 8 files changed, 101 insertions(+), 23 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index e6a2fe0229f..63e8206b2fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -601,7 +601,7 @@ public class Alter {
                 replaceTableInternal(db, origTable, olapNewTbl, swapTable, 
false);
                 // write edit log
                 ReplaceTableOperationLog log = new 
ReplaceTableOperationLog(db.getId(),
-                        origTable.getId(), olapNewTbl.getId(), swapTable);
+                        origTable.getId(), oldTblName, olapNewTbl.getId(), 
newTblName, swapTable);
                 Env.getCurrentEnv().getEditLog().logReplaceTable(log);
                 LOG.info("finish replacing table {} with table {}, is swap: 
{}", oldTblName, newTblName, swapTable);
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 6d483a41314..1f785713666 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.ReplacePartitionOperationLog;
+import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TableRenameColumnInfo;
@@ -45,6 +46,7 @@ import org.apache.doris.thrift.TStatusCode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -367,6 +369,22 @@ public class BinlogManager {
         addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
alterViewInfo);
     }
 
+    public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) 
{
+        if (StringUtils.isEmpty(info.getOrigTblName()) || 
StringUtils.isEmpty(info.getNewTblName())) {
+            LOG.warn("skip replace table binlog, because origTblName or 
newTblName is empty. info: {}", info);
+            return;
+        }
+
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+        tableIds.add(info.getOrigTblId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.REPLACE_TABLE;
+        String data = info.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
info);
+    }
+
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index e2eef7966be..c96e994be91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -22,7 +22,9 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
@@ -626,19 +628,29 @@ public class DBBinlog {
 
     // A method to record the dropped tables, indexes, and partitions.
     private void recordDroppedResources(TBinlog binlog, Object raw) {
+        recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), 
binlog.getData(), raw);
+    }
+
+    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, String data, Object raw) {
         if (raw == null) {
-            switch (binlog.getType()) {
+            switch (binlogType) {
                 case DROP_PARTITION:
-                    raw = DropPartitionInfo.fromJson(binlog.data);
+                    raw = DropPartitionInfo.fromJson(data);
                     break;
                 case DROP_TABLE:
-                    raw = DropTableRecord.fromJson(binlog.data);
+                    raw = DropTableRecord.fromJson(data);
                     break;
                 case ALTER_JOB:
-                    raw = AlterJobRecord.fromJson(binlog.data);
+                    raw = AlterJobRecord.fromJson(data);
                     break;
                 case TRUNCATE_TABLE:
-                    raw = TruncateTableRecord.fromJson(binlog.data);
+                    raw = TruncateTableRecord.fromJson(data);
+                    break;
+                case REPLACE_TABLE:
+                    raw = ReplaceTableOperationLog.fromJson(data);
+                    break;
+                case BARRIER:
+                    raw = BarrierLog.fromJson(data);
                     break;
                 default:
                     break;
@@ -648,29 +660,44 @@ public class DBBinlog {
             }
         }
 
-        if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof 
DropPartitionInfo) {
+        recordDroppedResources(binlogType, commitSeq, raw);
+    }
+
+    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, Object raw) {
+        if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof 
DropPartitionInfo) {
             long partitionId = ((DropPartitionInfo) raw).getPartitionId();
             if (partitionId > 0) {
-                droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
+                droppedPartitions.add(Pair.of(partitionId, commitSeq));
             }
-        } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw 
instanceof DropTableRecord) {
+        } else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof 
DropTableRecord) {
             long tableId = ((DropTableRecord) raw).getTableId();
             if (tableId > 0) {
-                droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
+                droppedTables.add(Pair.of(tableId, commitSeq));
             }
-        } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof 
AlterJobRecord) {
+        } else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof 
AlterJobRecord) {
             AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
             if (alterJobRecord.isJobFinished() && 
alterJobRecord.isSchemaChangeJob()) {
                 for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
                     if (indexId != null && indexId > 0) {
-                        droppedIndexes.add(Pair.of(indexId, 
binlog.getCommitSeq()));
+                        droppedIndexes.add(Pair.of(indexId, commitSeq));
                     }
                 }
             }
-        } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw 
instanceof TruncateTableRecord) {
+        } else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof 
TruncateTableRecord) {
             TruncateTableRecord truncateTableRecord = (TruncateTableRecord) 
raw;
             for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
-                droppedPartitions.add(Pair.of(partitionId, 
binlog.getCommitSeq()));
+                droppedPartitions.add(Pair.of(partitionId, commitSeq));
+            }
+        } else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof 
ReplaceTableOperationLog) {
+            ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
+            if (!record.isSwapTable()) {
+                droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
+            }
+        } else if (binlogType == TBinlogType.BARRIER && raw instanceof 
BarrierLog) {
+            BarrierLog log = (BarrierLog) raw;
+            // keep compatible with doris 2.0/2.1
+            if (log.hasBinlog()) {
+                recordDroppedResources(log.getBinlogType(), commitSeq, 
log.getBinlog(), null);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 4a9ce13e03b..86d56fb4a64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -109,6 +109,10 @@ public class BarrierLog implements Writable {
         return GsonUtils.GSON.toJson(this);
     }
 
+    public static BarrierLog fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, BarrierLog.class);
+    }
+
     @Override
     public String toString() {
         return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index df0cdb092a8..5ae6f62ebb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -306,7 +306,7 @@ public class EditLog {
                 case OperationType.OP_RENAME_TABLE: {
                     TableInfo info = (TableInfo) journal.getData();
                     env.replayRenameTable(info);
-                    
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+                    env.getBinlogManager().addTableRename(info, logId);
                     break;
                 }
                 case OperationType.OP_MODIFY_VIEW_DEF: {
@@ -318,7 +318,7 @@ public class EditLog {
                 case OperationType.OP_RENAME_PARTITION: {
                     TableInfo info = (TableInfo) journal.getData();
                     env.replayRenamePartition(info);
-                    
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+                    env.getBinlogManager().addTableRename(info, logId);
                     break;
                 }
                 case OperationType.OP_RENAME_COLUMN: {
@@ -366,7 +366,7 @@ public class EditLog {
                 case OperationType.OP_RENAME_ROLLUP: {
                     TableInfo info = (TableInfo) journal.getData();
                     env.replayRenameRollup(info);
-                    
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
+                    
env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
                     break;
                 }
                 case OperationType.OP_LOAD_START:
@@ -898,6 +898,7 @@ public class EditLog {
                 case OperationType.OP_REPLACE_TABLE: {
                     ReplaceTableOperationLog log = (ReplaceTableOperationLog) 
journal.getData();
                     env.getAlterInstance().replayReplaceTable(log);
+                    env.getBinlogManager().addReplaceTable(log, logId);
                     break;
                 }
                 case OperationType.OP_CREATE_SQL_BLOCK_RULE: {
@@ -1950,7 +1951,9 @@ public class EditLog {
     }
 
     public void logReplaceTable(ReplaceTableOperationLog log) {
-        logEdit(OperationType.OP_REPLACE_TABLE, log);
+        long logId = logEdit(OperationType.OP_REPLACE_TABLE, log);
+        LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log);
+        Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId);
     }
 
     public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 
op) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
index c5b0a05f0e6..3d11de4af43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplaceTableOperationLog.java
@@ -32,15 +32,23 @@ public class ReplaceTableOperationLog implements Writable {
     private long dbId;
     @SerializedName(value = "origTblId")
     private long origTblId;
+    @SerializedName(value = "origTblName")
+    private String origTblName;
     @SerializedName(value = "newTblName")
     private long newTblId;
+    @SerializedName(value = "actualNewTblName")
+    private String newTblName;
     @SerializedName(value = "swapTable")
     private boolean swapTable;
 
-    public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId, 
boolean swapTable) {
+    public ReplaceTableOperationLog(long dbId, long origTblId,
+            String origTblName, long newTblId, String newTblName,
+            boolean swapTable) {
         this.dbId = dbId;
         this.origTblId = origTblId;
+        this.origTblName = origTblName;
         this.newTblId = newTblId;
+        this.newTblName = newTblName;
         this.swapTable = swapTable;
     }
 
@@ -52,21 +60,37 @@ public class ReplaceTableOperationLog implements Writable {
         return origTblId;
     }
 
+    public String getOrigTblName() {
+        return origTblName;
+    }
+
     public long getNewTblId() {
         return newTblId;
     }
 
+    public String getNewTblName() {
+        return newTblName;
+    }
+
     public boolean isSwapTable() {
         return swapTable;
     }
 
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    public static ReplaceTableOperationLog fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
         Text.writeString(out, json);
     }
 
-    public static ReplaceTableOperationLog read(DataInput in) throws  
IOException {
+    public static ReplaceTableOperationLog read(DataInput in) throws 
IOException {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
index ec71c24a13f..dddcacb48a3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ReplaceTableOperationLogTest.java
@@ -34,7 +34,7 @@ public class ReplaceTableOperationLogTest {
         file.createNewFile();
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
 
-        ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3, 
true);
+        ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 
"old", 3, "new", true);
         log.write(dos);
 
         dos.flush();
@@ -48,6 +48,8 @@ public class ReplaceTableOperationLogTest {
         Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId());
         Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId());
         Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable());
+        
Assert.assertTrue(readLog.getOrigTblName().equals(log.getOrigTblName()));
+        Assert.assertTrue(readLog.getNewTblName().equals(log.getNewTblName()));
 
         // 3. delete files
         dis.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 5bce845beaa..4270513023a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1190,6 +1190,7 @@ enum TBinlogType {
   RENAME_COLUMN = 15,
   MODIFY_COMMENT = 16,
   MODIFY_VIEW_DEF = 17,
+  REPLACE_TABLE = 18,
 
   // Keep some IDs for allocation so that when new binlog types are added in 
the
   // future, the changes can be picked back to the old versions without 
breaking
@@ -1206,8 +1207,7 @@ enum TBinlogType {
   //    MODIFY_XXX = 17,
   //    MIN_UNKNOWN = 18,
   //    UNKNOWN_3 = 19,
-  MIN_UNKNOWN = 18,
-  UNKNOWN_3 = 19,
+  MIN_UNKNOWN = 19,
   UNKNOWN_4 = 20,
   UNKNOWN_5 = 21,
   UNKNOWN_6 = 22,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to