This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 46e4400a154 branch-3.0: [feat](binlog) Add Support recover binlog 
#44818 (#45293)
46e4400a154 is described below

commit 46e4400a15408d7df1e7e702968ee7857de2d438
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 12 19:45:30 2024 +0800

    branch-3.0: [feat](binlog) Add Support recover binlog #44818 (#45293)
    
    Cherry-picked from #44818
    
    Co-authored-by: Vallish Pai <vallish...@gmail.com>
---
 .../org/apache/doris/binlog/BinlogManager.java     | 27 ++++++++++++++++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 33 +++++++++++++++-------
 .../apache/doris/catalog/CatalogRecycleBin.java    |  6 ++--
 .../apache/doris/datasource/InternalCatalog.java   |  2 +-
 .../java/org/apache/doris/persist/EditLog.java     |  8 ++++--
 .../java/org/apache/doris/persist/RecoverInfo.java | 22 +++++++++++++--
 .../doris/persist/DropAndRecoverInfoTest.java      |  2 +-
 gensrc/thrift/FrontendService.thrift               |  4 +--
 8 files changed, 84 insertions(+), 20 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 67bb99a8bcd..b9eb91cc5f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -34,6 +34,7 @@ import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ModifyCommentOperationLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.RecoverInfo;
 import org.apache.doris.persist.ReplacePartitionOperationLog;
 import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
@@ -446,6 +447,32 @@ public class BinlogManager {
         addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
info);
     }
 
+
+    private boolean supportedRecoverInfo(RecoverInfo info) {
+        //table name and partitionName added together.
+        // recover table case, tablename must exist in newer version
+        // recover partition case also table name must exist.
+        // so checking only table name here.
+        if (StringUtils.isEmpty(info.getTableName())) {
+            LOG.warn("skip recover info binlog, because tableName is empty. 
info: {}", info);
+            return false;
+        }
+        return true;
+    }
+
+    public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
+        if (supportedRecoverInfo(info) == false) {
+            return;
+        }
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+        tableIds.add(info.getTableId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.RECOVER_INFO;
+        String data = info.toJson();
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
info);
+    }
+
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index b78ed389a0f..0816564f150 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.RecoverInfo;
 import org.apache.doris.persist.ReplaceTableOperationLog;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
@@ -124,7 +125,7 @@ public class DBBinlog {
 
         allBinlogs.add(binlog);
         binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
-        recordDroppedResources(binlog);
+        recordDroppedOrRecoveredResources(binlog);
 
         if (tableIds == null) {
             return;
@@ -178,7 +179,7 @@ public class DBBinlog {
                 return;
             }
 
-            recordDroppedResources(binlog, raw);
+            recordDroppedOrRecoveredResources(binlog, raw);
 
             switch (binlog.getType()) {
                 case CREATE_TABLE:
@@ -623,16 +624,16 @@ public class DBBinlog {
         }
     }
 
-    private void recordDroppedResources(TBinlog binlog) {
-        recordDroppedResources(binlog, null);
+    private void recordDroppedOrRecoveredResources(TBinlog binlog) {
+        recordDroppedOrRecoveredResources(binlog, null);
     }
 
     // A method to record the dropped tables, indexes, and partitions.
-    private void recordDroppedResources(TBinlog binlog, Object raw) {
-        recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), 
binlog.getData(), raw);
+    private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw) 
{
+        recordDroppedOrRecoveredResources(binlog.getType(), 
binlog.getCommitSeq(), binlog.getData(), raw);
     }
 
-    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, String data, Object raw) {
+    private void recordDroppedOrRecoveredResources(TBinlogType binlogType, 
long commitSeq, String data, Object raw) {
         if (raw == null) {
             switch (binlogType) {
                 case DROP_PARTITION:
@@ -656,6 +657,9 @@ public class DBBinlog {
                 case BARRIER:
                     raw = BarrierLog.fromJson(data);
                     break;
+                case RECOVER_INFO:
+                    raw = RecoverInfo.fromJson(data);
+                    break;
                 default:
                     break;
             }
@@ -664,10 +668,10 @@ public class DBBinlog {
             }
         }
 
-        recordDroppedResources(binlogType, commitSeq, raw);
+        recordDroppedOrRecoveredResources(binlogType, commitSeq, raw);
     }
 
-    private void recordDroppedResources(TBinlogType binlogType, long 
commitSeq, Object raw) {
+    private void recordDroppedOrRecoveredResources(TBinlogType binlogType, 
long commitSeq, Object raw) {
         if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof 
DropPartitionInfo) {
             long partitionId = ((DropPartitionInfo) raw).getPartitionId();
             if (partitionId > 0) {
@@ -706,7 +710,16 @@ public class DBBinlog {
             BarrierLog log = (BarrierLog) raw;
             // keep compatible with doris 2.0/2.1
             if (log.hasBinlog()) {
-                recordDroppedResources(log.getBinlogType(), commitSeq, 
log.getBinlog(), null);
+                recordDroppedOrRecoveredResources(log.getBinlogType(), 
commitSeq, log.getBinlog(), null);
+            }
+        } else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof 
RecoverInfo)) {
+            RecoverInfo recoverInfo = (RecoverInfo) raw;
+            long partitionId = recoverInfo.getPartitionId();
+            long tableId = recoverInfo.getTableId();
+            if (partitionId > 0) {
+                droppedPartitions.removeIf(entry -> (entry.first == 
partitionId));
+            } else if (tableId > 0) {
+                droppedTables.removeIf(entry -> (entry.first == tableId));
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 745c1c8a351..b5899435343 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -774,7 +774,8 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable, GsonPos
                 LOG.info("replay recover table[{}]", table.getId());
             } else {
                 // log
-                RecoverInfo recoverInfo = new RecoverInfo(db.getId(), 
table.getId(), -1L, "", newTableName, "");
+                RecoverInfo recoverInfo = new RecoverInfo(db.getId(), 
table.getId(),
+                                                    -1L, "", table.getName(), 
newTableName, "", "");
                 Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
             }
             // Only olap table need recover dynamic partition, other table 
like jdbc odbc view.. do not need it
@@ -873,7 +874,8 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable, GsonPos
         idToRecycleTime.remove(partitionId);
 
         // log
-        RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), 
partitionId, "", "", newPartitionName);
+        RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), 
partitionId, "",
+                                                    table.getName(), "", 
partitionName, newPartitionName);
         Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
         LOG.info("recover partition[{}]", partitionId);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 53641d70c3e..b3ef531660b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -657,7 +657,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
             fullNameToDb.put(db.getFullName(), db);
             idToDb.put(db.getId(), db);
             // log
-            RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, 
newDbName, "", "");
+            RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, 
newDbName, "", "", "", "");
             Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo);
             db.unmarkDropped();
         } finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index f1377e9daeb..73a2c6338da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -297,11 +297,13 @@ public class EditLog {
                 case OperationType.OP_RECOVER_TABLE: {
                     RecoverInfo info = (RecoverInfo) journal.getData();
                     env.replayRecoverTable(info);
+                    env.getBinlogManager().addRecoverTableRecord(info, logId);
                     break;
                 }
                 case OperationType.OP_RECOVER_PARTITION: {
                     RecoverInfo info = (RecoverInfo) journal.getData();
                     env.replayRecoverPartition(info);
+                    env.getBinlogManager().addRecoverTableRecord(info, logId);
                     break;
                 }
                 case OperationType.OP_RENAME_TABLE: {
@@ -1444,7 +1446,8 @@ public class EditLog {
     }
 
     public void logRecoverPartition(RecoverInfo info) {
-        logEdit(OperationType.OP_RECOVER_PARTITION, info);
+        long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
+        Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, 
logId);
     }
 
     public void logModifyPartition(ModifyPartitionInfo info) {
@@ -1471,7 +1474,8 @@ public class EditLog {
     }
 
     public void logRecoverTable(RecoverInfo info) {
-        logEdit(OperationType.OP_RECOVER_TABLE, info);
+        long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
+        Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, 
logId);
     }
 
     public void logDropRollup(DropInfo info) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
index 15764a99b43..eb4af6494e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
@@ -38,10 +38,14 @@ public class RecoverInfo implements Writable, 
GsonPostProcessable {
     private String newDbName;
     @SerializedName(value = "tableId")
     private long tableId;
+    @SerializedName(value = "tableName")
+    private String tableName;                        /// added for table name.
     @SerializedName(value = "newTableName")
     private String newTableName;
     @SerializedName(value = "partitionId")
     private long partitionId;
+    @SerializedName(value = "partitionName")
+    private String partitionName;
     @SerializedName(value = "newPartitionName")
     private String newPartitionName;
 
@@ -49,13 +53,15 @@ public class RecoverInfo implements Writable, 
GsonPostProcessable {
         // for persist
     }
 
-    public RecoverInfo(long dbId, long tableId, long partitionId, String 
newDbName, String newTableName,
-                       String newPartitionName) {
+    public RecoverInfo(long dbId, long tableId, long partitionId, String 
newDbName, String tableName,
+                        String newTableName, String partitionName, String 
newPartitionName) {
         this.dbId = dbId;
         this.tableId = tableId;
+        this.tableName = tableName;
         this.partitionId = partitionId;
         this.newDbName = newDbName;
         this.newTableName = newTableName;
+        this.partitionName = partitionName;
         this.newPartitionName = newPartitionName;
     }
 
@@ -67,6 +73,10 @@ public class RecoverInfo implements Writable, 
GsonPostProcessable {
         return tableId;
     }
 
+    public String getTableName() {
+        return tableName;
+    }
+
     public long getPartitionId() {
         return partitionId;
     }
@@ -109,4 +119,12 @@ public class RecoverInfo implements Writable, 
GsonPostProcessable {
     public void gsonPostProcess() throws IOException {
         newDbName = ClusterNamespace.getNameFromFullName(newDbName);
     }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    public static RecoverInfo fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, RecoverInfo.class);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
index 8c74fba2753..63afe375548 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
@@ -86,7 +86,7 @@ public class DropAndRecoverInfoTest {
         file.createNewFile();
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
 
-        RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "b", "c");
+        RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "", "b", "", "c");
         info1.write(dos);
         dos.flush();
         dos.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index a18f7a26297..cb2f3fe9b9a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1198,6 +1198,7 @@ enum TBinlogType {
   RENAME_ROLLUP = 21,
   RENAME_PARTITION = 22,
   DROP_ROLLUP = 23,
+  RECOVER_INFO = 24,
 
   // Keep some IDs for allocation so that when new binlog types are added in 
the
   // future, the changes can be picked back to the old versions without 
breaking
@@ -1214,8 +1215,7 @@ enum TBinlogType {
   //    MODIFY_XXX = 17,
   //    MIN_UNKNOWN = 18,
   //    UNKNOWN_3 = 19,
-  MIN_UNKNOWN = 24,
-  UNKNOWN_9 = 25,
+  MIN_UNKNOWN = 25,
   UNKNOWN_10 = 26,
   UNKNOWN_11 = 27,
   UNKNOWN_12 = 28,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to