This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bbc893c953 [Enhancement](binlog) Add ModifyPartition, BatchModifyPartitions && ReplacePartitionOperationLog support (#23773) bbc893c953 is described below commit bbc893c953f6cefd5179320d14d61dc2b3022d3d Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Sat Sep 2 13:19:07 2023 +0800 [Enhancement](binlog) Add ModifyPartition, BatchModifyPartitions && ReplacePartitionOperationLog support (#23773) --- .../org/apache/doris/binlog/BinlogManager.java | 26 +++++++++++++++++ .../main/java/org/apache/doris/catalog/Env.java | 5 ++-- .../doris/persist/BatchModifyPartitionsInfo.java | 34 ++++++++++++++++++++++ .../java/org/apache/doris/persist/EditLog.java | 17 +++++++++-- .../persist/ReplacePartitionOperationLog.java | 21 +++++++++++-- gensrc/thrift/FrontendService.thrift | 2 ++ 6 files changed, 98 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 3047146162..aee3f6da79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -24,9 +24,11 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyTablePropertyOperationLog; +import org.apache.doris.persist.ReplacePartitionOperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; @@ -269,6 +271,30 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); } + // add Modify partitions + public void addModifyPartitions(BatchModifyPartitionsInfo info, long commitSeq) { + long dbId = info.getDbId(); + List<Long> tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.MODIFY_PARTITIONS; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + } + + // add Replace partition + public void addReplacePartitions(ReplacePartitionOperationLog info, long commitSeq) { + long dbId = info.getDbId(); + List<Long> tableIds = Lists.newArrayList(); + tableIds.add(info.getTblId()); + long timestamp = -1; + TBinlogType type = TBinlogType.REPLACE_PARTITIONS; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + } + // get binlog by dbId, return first binlog.version > version public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index af86bb32ef..89b424f647 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5194,8 +5194,9 @@ public class Env { olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName); // write log - ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), olapTable.getId(), - partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName); + ReplacePartitionOperationLog info = + new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(), + partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName); editLog.logReplaceTempPartition(info); LOG.info("finished to replace partitions {} with temp partitions {} from table: {}", clause.getPartitionNames(), clause.getTempPartitionNames(), olapTable.getName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java index 8d79957e88..fe7e8dbe76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchModifyPartitionsInfo.java @@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import java.io.DataInput; @@ -37,9 +38,38 @@ public class BatchModifyPartitionsInfo implements Writable { private List<ModifyPartitionInfo> infos; public BatchModifyPartitionsInfo(List<ModifyPartitionInfo> infos) { + if (infos == null || infos.isEmpty()) { + throw new IllegalArgumentException("infos is null or empty"); + } + + long dbId = infos.get(0).getDbId(); + long tableId = infos.get(0).getTableId(); + for (ModifyPartitionInfo info : infos) { + if (info.getDbId() != dbId || info.getTableId() != tableId) { + throw new IllegalArgumentException("dbId or tableId is not equal"); + } + } + this.infos = infos; } + public BatchModifyPartitionsInfo(ModifyPartitionInfo info) { + if (info == null) { + throw new IllegalArgumentException("info is null"); + } + + this.infos = Lists.newArrayList(); + this.infos.add(info); + } + + public long getDbId() { + return infos.get(0).getDbId(); + } + + public long getTableId() { + return infos.get(0).getTableId(); + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); @@ -72,4 +102,8 @@ public class BatchModifyPartitionsInfo implements Writable { } return true; } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 9f9baf276a..8cdf5eb978 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -261,7 +261,9 @@ public class EditLog { ModifyPartitionInfo info = (ModifyPartitionInfo) journal.getData(); LOG.info("Begin to unprotect modify partition. db = " + info.getDbId() + " table = " + info.getTableId() + " partitionId = " + info.getPartitionId()); + BatchModifyPartitionsInfo infos = new BatchModifyPartitionsInfo(info); env.getAlterInstance().replayModifyPartition(info); + env.getBinlogManager().addModifyPartitions(infos, logId); break; } case OperationType.OP_BATCH_MODIFY_PARTITION: { @@ -269,6 +271,7 @@ public class EditLog { for (ModifyPartitionInfo modifyPartitionInfo : info.getModifyPartitionInfos()) { env.getAlterInstance().replayModifyPartition(modifyPartitionInfo); } + env.getBinlogManager().addModifyPartitions(info, logId); break; } case OperationType.OP_ERASE_TABLE: { @@ -799,6 +802,7 @@ public class EditLog { ReplacePartitionOperationLog replaceTempPartitionLog = (ReplacePartitionOperationLog) journal.getData(); env.replayReplaceTempPartition(replaceTempPartitionLog); + env.getBinlogManager().addReplacePartitions(replaceTempPartitionLog, logId); break; } case OperationType.OP_INSTALL_PLUGIN: { @@ -1289,11 +1293,16 @@ public class EditLog { } public void logModifyPartition(ModifyPartitionInfo info) { - logEdit(OperationType.OP_MODIFY_PARTITION, info); + long logId = logEdit(OperationType.OP_MODIFY_PARTITION, info); + BatchModifyPartitionsInfo infos = new BatchModifyPartitionsInfo(info); + LOG.info("log modify partition, logId:{}, infos: {}", logId, infos.toJson()); + Env.getCurrentEnv().getBinlogManager().addModifyPartitions(infos, logId); } public void logBatchModifyPartition(BatchModifyPartitionsInfo info) { - logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info); + long logId = logEdit(OperationType.OP_BATCH_MODIFY_PARTITION, info); + LOG.info("log modify partition, logId:{}, infos: {}", logId, info.toJson()); + Env.getCurrentEnv().getBinlogManager().addModifyPartitions(info, logId); } public void logDropTable(DropInfo info) { @@ -1721,7 +1730,9 @@ public class EditLog { } public void logReplaceTempPartition(ReplacePartitionOperationLog info) { - logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info); + long logId = logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info); + LOG.info("log replace temp partition, logId: {}, info: {}", logId, info.toJson()); + Env.getCurrentEnv().getBinlogManager().addReplacePartitions(info, logId); } public void logInstallPlugin(PluginInfo plugin) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java index 9571634141..358b45d5bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplacePartitionOperationLog.java @@ -35,8 +35,12 @@ public class ReplacePartitionOperationLog implements Writable { @SerializedName(value = "dbId") private long dbId; + @SerializedName(value = "dbName") + private String dbName; @SerializedName(value = "tblId") private long tblId; + @SerializedName(value = "tblName") + private String tblName; @SerializedName(value = "partitions") private List<String> partitions; @SerializedName(value = "tempPartitions") @@ -46,10 +50,14 @@ public class ReplacePartitionOperationLog implements Writable { @SerializedName(value = "useTempPartitionName") private boolean useTempPartitionName; - public ReplacePartitionOperationLog(long dbId, long tblId, List<String> partitionNames, - List<String> tempPartitonNames, boolean strictRange, boolean useTempPartitionName) { + public ReplacePartitionOperationLog(long dbId, String dbName, long tblId, String tblName, + List<String> partitionNames, + List<String> tempPartitonNames, boolean strictRange, + boolean useTempPartitionName) { this.dbId = dbId; + this.dbName = dbName; this.tblId = tblId; + this.tblName = tblName; this.partitions = partitionNames; this.tempPartitions = tempPartitonNames; this.strictRange = strictRange; @@ -90,4 +98,13 @@ public class ReplacePartitionOperationLog implements Writable { String json = GsonUtils.GSON.toJson(this); Text.writeString(out, json); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7199484ed1..c7c4766786 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1012,6 +1012,8 @@ enum TBinlogType { ALTER_DATABASE_PROPERTY = 8, MODIFY_TABLE_PROPERTY = 9, BARRIER = 10, + MODIFY_PARTITIONS = 11, + REPLACE_PARTITIONS = 12, } struct TBinlog { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org