This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new de39632f1b [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344) de39632f1b is described below commit de39632f1b7d4c53929a88daa7ed9f412a307dc1 Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Fri Jun 30 16:57:11 2023 +0800 [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344) Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com> --- .../apache/doris/binlog/AddPartitionRecord.java | 23 ++++++++++++++++++++++ .../org/apache/doris/binlog/BinlogManager.java | 12 +++++++++++ .../apache/doris/persist/DropPartitionInfo.java | 13 ++++++++++++ .../java/org/apache/doris/persist/EditLog.java | 12 +++++++---- gensrc/thrift/FrontendService.thrift | 2 ++ 5 files changed, 58 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java index 5dc2f62749..9bc5ff7da0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java @@ -18,6 +18,7 @@ package org.apache.doris.binlog; import org.apache.doris.catalog.DataProperty; +import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; @@ -51,6 +52,8 @@ public class AddPartitionRecord { private boolean isTempPartition = false; @SerializedName(value = "isMutable") private boolean isMutable = true; + @SerializedName(value = "sql") + private String sql; public AddPartitionRecord(long commitSeq, PartitionPersistInfo partitionPersistInfo) { this.commitSeq = commitSeq; @@ -64,6 +67,26 @@ public class AddPartitionRecord { this.isInMemory = partitionPersistInfo.isInMemory(); this.isTempPartition = partitionPersistInfo.isTempPartition(); this.isMutable = partitionPersistInfo.isMutable(); + + StringBuilder sb = new StringBuilder(); + sb.append("ADD PARTITION ").append("`").append(partition.getName()).append("`").append(" VALUES "); + if (this.listPartitionItem.equals(ListPartitionItem.DUMMY_ITEM)) { + // range + sb.append("["); + sb.append(range.lowerEndpoint().toSql()); + sb.append(", "); + sb.append(range.upperEndpoint().toSql()); + sb.append(")"); + } else { + // list + sb.append("IN ("); + sb.append(((ListPartitionItem) listPartitionItem).toSql()); + sb.append(")"); + } + sb.append("(\"version_info\" = \""); + sb.append(partition.getVisibleVersion()).append("\""); + sb.append(");"); + this.sql = sb.toString(); } public long getCommitSeq() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 59ba596152..bccc5dfc48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -128,6 +129,17 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); } + public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) { + long dbId = dropPartitionInfo.getDbId(); + List<Long> tableIds = new ArrayList<Long>(); + tableIds.add(dropPartitionInfo.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.DROP_PARTITION; + String data = dropPartitionInfo.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + } + // get binlog by dbId, return first binlog.version > version public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long commitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java index 5a4a07f0cd..f4fac91526 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java @@ -40,6 +40,8 @@ public class DropPartitionInfo implements Writable { private boolean forceDrop = false; @SerializedName(value = "recycleTime") private long recycleTime = 0; + @SerializedName(value = "sql") + private String sql; private DropPartitionInfo() { } @@ -52,6 +54,17 @@ public class DropPartitionInfo implements Writable { this.isTempPartition = isTempPartition; this.forceDrop = forceDrop; this.recycleTime = recycleTime; + + StringBuilder sb = new StringBuilder(); + sb.append("DROP PARTITION "); + if (isTempPartition) { + sb.append("TEMPORARY "); + } + sb.append("`").append(partitionName).append("`"); + if (forceDrop) { + sb.append(" FORCE"); + } + this.sql = sb.toString(); } public Long getDbId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 7be7253f1d..b67ca9259e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -234,8 +234,8 @@ public class EditLog { "Begin to unprotect add partition. db = " + info.getDbId() + " table = " + info.getTableId() + " partitionName = " + info.getPartition().getName()); AddPartitionRecord addPartitionRecord = new AddPartitionRecord(logId, info); - Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(addPartitionRecord); env.replayAddPartition(info); + env.getBinlogManager().addAddPartitionRecord(addPartitionRecord); break; } case OperationType.OP_DROP_PARTITION: { @@ -243,6 +243,7 @@ public class EditLog { LOG.info("Begin to unprotect drop partition. db = " + info.getDbId() + " table = " + info.getTableId() + " partitionName = " + info.getPartitionName()); env.replayDropPartition(info); + env.getBinlogManager().addDropPartitionRecord(info, logId); break; } case OperationType.OP_MODIFY_PARTITION: { @@ -1204,14 +1205,17 @@ public class EditLog { logEdit(OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA, info); } - public void logAddPartition(PartitionPersistInfo info) { + public long logAddPartition(PartitionPersistInfo info) { long logId = logEdit(OperationType.OP_ADD_PARTITION, info); AddPartitionRecord record = new AddPartitionRecord(logId, info); Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record); + return logId; } - public void logDropPartition(DropPartitionInfo info) { - logEdit(OperationType.OP_DROP_PARTITION, info); + public long logDropPartition(DropPartitionInfo info) { + long logId = logEdit(OperationType.OP_DROP_PARTITION, info); + Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId); + return logId; } public void logErasePartition(long partitionId) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index ca84522401..32e2e3b0a3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -956,6 +956,8 @@ enum TBinlogType { UPSERT = 0, ADD_PARTITION = 1, CREATE_TABLE = 2, + DROP_PARTITION = 3, + DROP_TABLE = 4, } struct TBinlog { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org