This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a220f96a3a4 branch-3.0: [feat](binlog) add modify distribution bucket num binlog #49894 (#50068) a220f96a3a4 is described below commit a220f96a3a41962011eba9c5a95ca4a09d8a1f85 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed May 7 14:30:49 2025 +0800 branch-3.0: [feat](binlog) add modify distribution bucket num binlog #49894 (#50068) Cherry-picked from #49894 Co-authored-by: koarz <li...@selectdb.com> --- .../org/apache/doris/binlog/BinlogManager.java | 17 +++++++++++++ .../org/apache/doris/catalog/DistributionInfo.java | 8 ++++++ .../main/java/org/apache/doris/catalog/Env.java | 5 ++-- .../apache/doris/catalog/HashDistributionInfo.java | 15 ++++++----- .../doris/catalog/RandomDistributionInfo.java | 5 ++++ .../java/org/apache/doris/persist/EditLog.java | 7 ++++-- ...leDefaultDistributionBucketNumOperationLog.java | 29 +++++++++++++++++++++- gensrc/thrift/FrontendService.thrift | 4 +-- 8 files changed, 77 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 3fe0a4d99ef..f11149af885 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -33,6 +33,7 @@ import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyCommentOperationLog; +import org.apache.doris.persist.ModifyTableDefaultDistributionBucketNumOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.RecoverInfo; import org.apache.doris.persist.ReplacePartitionOperationLog; @@ -435,6 +436,22 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } + public void addModifyDistributionNum(ModifyTableDefaultDistributionBucketNumOperationLog info, long commitSeq) { + if (info.getBucketNum() <= 0 || info.getType() == null) { + LOG.warn("skip modify distribution num binlog, because bucket num is invalid. info: {}", info); + return; + } + + long dbId = info.getDbId(); + List<Long> tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = System.currentTimeMillis(); + TBinlogType type = TBinlogType.MODIFY_DISTRIBUTION_BUCKET_NUM; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) { long dbId = info.getDbId(); List<Long> tableIds = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java index 9b05f8ca709..e67b87b193d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java @@ -84,6 +84,14 @@ public abstract class DistributionInfo { throw new NotImplementedException("toDistributionDesc not implemented"); } + public boolean getAutoBucket() { + return autoBucket; + } + + public String getColumnsName() { + throw new NotImplementedException("getColumnsName not implemented"); + } + @Deprecated public void readFields(DataInput in) throws IOException { type = DistributionInfoType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f698612d2fd..1da6ffacf8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5588,8 +5588,9 @@ public class Env { defaultDistributionInfo.setBucketNum(bucketNum); ModifyTableDefaultDistributionBucketNumOperationLog info - = new ModifyTableDefaultDistributionBucketNumOperationLog( - db.getId(), olapTable.getId(), bucketNum); + = new ModifyTableDefaultDistributionBucketNumOperationLog(db.getId(), olapTable.getId(), + distributionInfo.getType(), distributionInfo.getAutoBucket(), bucketNum, + defaultDistributionInfo.getColumnsName()); editLog.logModifyDefaultDistributionBucketNum(info); LOG.info("modify table[{}] default bucket num to {}", olapTable.getName(), bucketNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index 7d1ab1db8d1..c6814b35cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -127,16 +127,19 @@ public class HashDistributionInfo extends DistributionInfo { } @Override - public String toSql(boolean forSync) { - StringBuilder builder = new StringBuilder(); - builder.append("DISTRIBUTED BY HASH("); - + public String getColumnsName() { List<String> colNames = Lists.newArrayList(); for (Column column : distributionColumns) { colNames.add("`" + column.getName() + "`"); } - String colList = Joiner.on(", ").join(colNames); - builder.append(colList); + return Joiner.on(", ").join(colNames); + } + + @Override + public String toSql(boolean forSync) { + StringBuilder builder = new StringBuilder(); + builder.append("DISTRIBUTED BY HASH("); + builder.append(getColumnsName()); if (autoBucket && !forSync) { builder.append(") BUCKETS AUTO"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java index d225adbf0a2..0bce8ce88ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java @@ -51,6 +51,11 @@ public class RandomDistributionInfo extends DistributionInfo { return bucketNum; } + @Override + public String getColumnsName() { + return ""; + } + @Override public String toSql(boolean forSync) { StringBuilder builder = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 7b719c0fb0f..a06e2c4b693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -845,6 +845,7 @@ public class EditLog { ModifyTableDefaultDistributionBucketNumOperationLog log = (ModifyTableDefaultDistributionBucketNumOperationLog) journal.getData(); env.replayModifyTableDefaultDistributionBucketNum(log); + env.getBinlogManager().addModifyDistributionNum(log, logId); break; } case OperationType.OP_REPLACE_TEMP_PARTITION: { @@ -1931,8 +1932,10 @@ public class EditLog { return logModifyTableProperty(OperationType.OP_MODIFY_REPLICATION_NUM, info); } - public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info) { - logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info); + public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog log) { + long logId = logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, log); + LOG.info("add modify distribution bucket num binlog, logId: {}, infos: {}", logId, log); + Env.getCurrentEnv().getBinlogManager().addModifyDistributionNum(log, logId); } public long logModifyTableProperties(ModifyTablePropertyOperationLog info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTableDefaultDistributionBucketNumOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTableDefaultDistributionBucketNumOperationLog.java index 151f29ea519..72e0fc0afb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTableDefaultDistributionBucketNumOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTableDefaultDistributionBucketNumOperationLog.java @@ -17,6 +17,7 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; @@ -33,13 +34,23 @@ public class ModifyTableDefaultDistributionBucketNumOperationLog implements Writ private long dbId; @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "type") + private DistributionInfoType type; + @SerializedName(value = "autoBucket") + protected boolean autoBucket; @SerializedName(value = "bucketNum") private int bucketNum; + @SerializedName(value = "columnsName") + private String columnsName; - public ModifyTableDefaultDistributionBucketNumOperationLog(long dbId, long tableId, int bucketNum) { + public ModifyTableDefaultDistributionBucketNumOperationLog(long dbId, long tableId, DistributionInfoType type, + boolean autoBucket, int bucketNum, String columnsName) { this.dbId = dbId; this.tableId = tableId; + this.type = type; + this.autoBucket = autoBucket; this.bucketNum = bucketNum; + this.columnsName = columnsName; } public long getDbId() { @@ -54,6 +65,18 @@ public class ModifyTableDefaultDistributionBucketNumOperationLog implements Writ return bucketNum; } + public DistributionInfoType getType() { + return type; + } + + public boolean getAutoBucket() { + return autoBucket; + } + + public String getColumnsName() { + return columnsName == null ? "" : columnsName; + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); @@ -62,4 +85,8 @@ public class ModifyTableDefaultDistributionBucketNumOperationLog implements Writ public static ModifyTableDefaultDistributionBucketNumOperationLog read(DataInput in) throws IOException { return GsonUtils.GSON.fromJson(Text.readString(in), ModifyTableDefaultDistributionBucketNumOperationLog.class); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c330eed1653..3c4f6fc1e90 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1225,6 +1225,7 @@ enum TBinlogType { RENAME_PARTITION = 22, DROP_ROLLUP = 23, RECOVER_INFO = 24, + MODIFY_DISTRIBUTION_BUCKET_NUM = 25 // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1241,8 +1242,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 25, - UNKNOWN_10 = 26, + MIN_UNKNOWN = 26, UNKNOWN_11 = 27, UNKNOWN_12 = 28, UNKNOWN_13 = 29, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org