This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bbc893c953 [Enhancement](binlog) Add ModifyPartition, 
BatchModifyPartitions && ReplacePartitionOperationLog support (#23773)
bbc893c953 is described below

commit bbc893c953f6cefd5179320d14d61dc2b3022d3d
Author: Jack Drogon <jack.xsuper...@gmail.com>
AuthorDate: Sat Sep 2 13:19:07 2023 +0800

    [Enhancement](binlog) Add ModifyPartition, BatchModifyPartitions && 
ReplacePartitionOperationLog support (#23773)
---
 .../org/apache/doris/binlog/BinlogManager.java     | 26 +++++++++++++++++
 .../main/java/org/apache/doris/catalog/Env.java    |  5 ++--
 .../doris/persist/BatchModifyPartitionsInfo.java   | 34 ++++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     | 17 +++++++++--
 .../persist/ReplacePartitionOperationLog.java      | 21 +++++++++++--
 gensrc/thrift/FrontendService.thrift               |  2 ++
 6 files changed, 98 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 3047146162..aee3f6da79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -24,9 +24,11 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.persist.AlterDatabasePropertyInfo;
 import org.apache.doris.persist.BarrierLog;
+import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BinlogGcInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.ReplacePartitionOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
@@ -269,6 +271,30 @@ public class BinlogManager {
         addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
+    // add Modify partitions
+    public void addModifyPartitions(BatchModifyPartitionsInfo info, long 
commitSeq) {
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+        tableIds.add(info.getTableId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.MODIFY_PARTITIONS;
+        String data = info.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
+    }
+
+    // add Replace partition
+    public void addReplacePartitions(ReplacePartitionOperationLog info, long 
commitSeq) {
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+        tableIds.add(info.getTblId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.REPLACE_PARTITIONS;
+        String data = info.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
+    }
+
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index af86bb32ef..89b424f647 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5194,8 +5194,9 @@ public class Env {
         olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, 
isStrictRange, useTempPartitionName);
 
         // write log
-        ReplacePartitionOperationLog info = new 
ReplacePartitionOperationLog(db.getId(), olapTable.getId(),
-                partitionNames, tempPartitionNames, isStrictRange, 
useTempPartitionName);
+        ReplacePartitionOperationLog info =
+                new ReplacePartitionOperationLog(db.getId(), db.getFullName(), 
olapTable.getId(), olapTable.getName(),
+                        partitionNames, tempPartitionNames, isStrictRange, 
useTempPartitionName);
         editLog.logReplaceTempPartition(info);
         LOG.info("finished to replace partitions {} with temp partitions {} 
from table: {}", clause.getPartitionNames(),
                 clause.getTempPartitionNames(), olapTable.getName());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java
index 8d79957e88..fe7e8dbe76 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
@@ -37,9 +38,38 @@ public class BatchModifyPartitionsInfo implements Writable {
     private List<ModifyPartitionInfo> infos;
 
     public BatchModifyPartitionsInfo(List<ModifyPartitionInfo> infos) {
+        if (infos == null || infos.isEmpty()) {
+            throw new IllegalArgumentException("infos is null or empty");
+        }
+
+        long dbId = infos.get(0).getDbId();
+        long tableId = infos.get(0).getTableId();
+        for (ModifyPartitionInfo info : infos) {
+            if (info.getDbId() != dbId || info.getTableId() != tableId) {
+                throw new IllegalArgumentException("dbId or tableId is not 
equal");
+            }
+        }
+
         this.infos = infos;
     }
 
+    public BatchModifyPartitionsInfo(ModifyPartitionInfo info) {
+        if (info == null) {
+            throw new IllegalArgumentException("info is null");
+        }
+
+        this.infos = Lists.newArrayList();
+        this.infos.add(info);
+    }
+
+    public long getDbId() {
+        return infos.get(0).getDbId();
+    }
+
+    public long getTableId() {
+        return infos.get(0).getTableId();
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -72,4 +102,8 @@ public class BatchModifyPartitionsInfo implements Writable {
         }
         return true;
     }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9f9baf276a..8cdf5eb978 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -261,7 +261,9 @@ public class EditLog {
                     ModifyPartitionInfo info = (ModifyPartitionInfo) 
journal.getData();
                     LOG.info("Begin to unprotect modify partition. db = " + 
info.getDbId() + " table = "
                             + info.getTableId() + " partitionId = " + 
info.getPartitionId());
+                    BatchModifyPartitionsInfo infos = new 
BatchModifyPartitionsInfo(info);
                     env.getAlterInstance().replayModifyPartition(info);
+                    env.getBinlogManager().addModifyPartitions(infos, logId);
                     break;
                 }
                 case OperationType.OP_BATCH_MODIFY_PARTITION: {
@@ -269,6 +271,7 @@ public class EditLog {
                     for (ModifyPartitionInfo modifyPartitionInfo : 
info.getModifyPartitionInfos()) {
                         
env.getAlterInstance().replayModifyPartition(modifyPartitionInfo);
                     }
+                    env.getBinlogManager().addModifyPartitions(info, logId);
                     break;
                 }
                 case OperationType.OP_ERASE_TABLE: {
@@ -799,6 +802,7 @@ public class EditLog {
                     ReplacePartitionOperationLog replaceTempPartitionLog =
                             (ReplacePartitionOperationLog) journal.getData();
                     env.replayReplaceTempPartition(replaceTempPartitionLog);
+                    
env.getBinlogManager().addReplacePartitions(replaceTempPartitionLog, logId);
                     break;
                 }
                 case OperationType.OP_INSTALL_PLUGIN: {
@@ -1289,11 +1293,16 @@ public class EditLog {
     }
 
     public void logModifyPartition(ModifyPartitionInfo info) {
-        logEdit(OperationType.OP_MODIFY_PARTITION, info);
+        long logId = logEdit(OperationType.OP_MODIFY_PARTITION, info);
+        BatchModifyPartitionsInfo infos = new BatchModifyPartitionsInfo(info);
+        LOG.info("log modify partition, logId:{}, infos: {}", logId, 
infos.toJson());
+        Env.getCurrentEnv().getBinlogManager().addModifyPartitions(infos, 
logId);
     }
 
     public void logBatchModifyPartition(BatchModifyPartitionsInfo info) {
-        logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info);
+        long logId = logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info);
+        LOG.info("log modify partition, logId:{}, infos: {}", logId, 
info.toJson());
+        Env.getCurrentEnv().getBinlogManager().addModifyPartitions(info, 
logId);
     }
 
     public void logDropTable(DropInfo info) {
@@ -1721,7 +1730,9 @@ public class EditLog {
     }
 
     public void logReplaceTempPartition(ReplacePartitionOperationLog info) {
-        logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
+        long logId = logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
+        LOG.info("log replace temp partition, logId: {}, info: {}", logId, 
info.toJson());
+        Env.getCurrentEnv().getBinlogManager().addReplacePartitions(info, 
logId);
     }
 
     public void logInstallPlugin(PluginInfo plugin) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java
index 9571634141..358b45d5bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java
@@ -35,8 +35,12 @@ public class ReplacePartitionOperationLog implements 
Writable {
 
     @SerializedName(value = "dbId")
     private long dbId;
+    @SerializedName(value = "dbName")
+    private String dbName;
     @SerializedName(value = "tblId")
     private long tblId;
+    @SerializedName(value = "tblName")
+    private String tblName;
     @SerializedName(value = "partitions")
     private List<String> partitions;
     @SerializedName(value = "tempPartitions")
@@ -46,10 +50,14 @@ public class ReplacePartitionOperationLog implements 
Writable {
     @SerializedName(value = "useTempPartitionName")
     private boolean useTempPartitionName;
 
-    public ReplacePartitionOperationLog(long dbId, long tblId, List<String> 
partitionNames,
-            List<String> tempPartitonNames, boolean strictRange, boolean 
useTempPartitionName) {
+    public ReplacePartitionOperationLog(long dbId, String dbName, long tblId, 
String tblName,
+                                        List<String> partitionNames,
+                                        List<String> tempPartitonNames, 
boolean strictRange,
+                                        boolean useTempPartitionName) {
         this.dbId = dbId;
+        this.dbName = dbName;
         this.tblId = tblId;
+        this.tblName = tblName;
         this.partitions = partitionNames;
         this.tempPartitions = tempPartitonNames;
         this.strictRange = strictRange;
@@ -90,4 +98,13 @@ public class ReplacePartitionOperationLog implements 
Writable {
         String json = GsonUtils.GSON.toJson(this);
         Text.writeString(out, json);
     }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7199484ed1..c7c4766786 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1012,6 +1012,8 @@ enum TBinlogType {
   ALTER_DATABASE_PROPERTY = 8,
   MODIFY_TABLE_PROPERTY = 9,
   BARRIER = 10,
+  MODIFY_PARTITIONS = 11,
+  REPLACE_PARTITIONS = 12,
 }
 
 struct TBinlog {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to