This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 15230b9b70b [chore](fe) Returns dropped partitions in GetMeta request (#38071) 15230b9b70b is described below commit 15230b9b70bde5872c082fca2095be7208860db8 Author: walter <w41te...@gmail.com> AuthorDate: Thu Jul 18 21:20:07 2024 +0800 [chore](fe) Returns dropped partitions in GetMeta request (#38071) Cherry-pick #37196, #37326. The CCR syncer needs to know the distribution of tables, partitions, indexes, and replicas when synchronizing binlogs. If a partition is deleted before the binlog synchronization is complete, the CCR syncer cannot continue synchronizing. This PR will record the deleted partitions and include them in the get meta response, allowing the CCR syncer to filter out the binlogs that belong to these partitions. The CCR syncer part PR is selectdb/ccr-syncer#117. --- .../org/apache/doris/binlog/BinlogManager.java | 48 ++++++++++++++-------- .../java/org/apache/doris/binlog/DBBinlog.java | 47 ++++++++++++++++++++- .../java/org/apache/doris/binlog/TableBinlog.java | 23 +++++------ .../main/java/org/apache/doris/catalog/Env.java | 5 +++ .../apache/doris/datasource/InternalCatalog.java | 7 ++-- .../apache/doris/persist/DropPartitionInfo.java | 21 ++++++---- .../org/apache/doris/binlog/BinlogManagerTest.java | 20 ++++----- .../java/org/apache/doris/binlog/DbBinlogTest.java | 8 ++-- .../doris/persist/DropPartitionInfoTest.java | 15 +++---- gensrc/thrift/FrontendService.thrift | 1 + 10 files changed, 133 insertions(+), 62 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 454f678e2e1..77f2bf74e66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -97,7 +97,7 @@ public class BinlogManager { } } - private void addBinlog(TBinlog binlog) { + private void addBinlog(TBinlog binlog, Object raw) { if (!Config.enable_feature_binlog) { return; } @@ -116,11 +116,11 @@ public class BinlogManager { lock.writeLock().unlock(); } - dbBinlog.addBinlog(binlog); + dbBinlog.addBinlog(binlog, raw); } private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type, - String data, boolean removeEnableCache) { + String data, boolean removeEnableCache, Object raw) { if (!Config.enable_feature_binlog) { return; } @@ -152,7 +152,7 @@ public class BinlogManager { } if (anyEnable) { - addBinlog(binlog); + addBinlog(binlog, raw); } afterAddBinlog(binlog); @@ -166,7 +166,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.UPSERT; String data = upsertRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, upsertRecord); } public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) { @@ -178,7 +178,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.ADD_PARTITION; String data = addPartitionRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, addPartitionRecord); } public void addCreateTableRecord(CreateTableRecord createTableRecord) { @@ -190,7 +190,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.CREATE_TABLE; String data = createTableRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, createTableRecord); } public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) { @@ -201,7 +201,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.DROP_PARTITION; String data = dropPartitionInfo.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, dropPartitionInfo); } public void addDropTableRecord(DropTableRecord record) { @@ -213,7 +213,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.DROP_TABLE; String data = record.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record); } public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) { @@ -225,7 +225,7 @@ public class BinlogManager { AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob); String data = alterJobRecord.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJob); } public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) { @@ -236,7 +236,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) { @@ -247,7 +247,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info); } public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) { @@ -258,7 +258,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info); } // add Barrier log @@ -279,7 +279,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.BARRIER; String data = barrierLog.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, barrierLog); } // add Modify partitions @@ -291,7 +291,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.MODIFY_PARTITIONS; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } // add Replace partition @@ -303,7 +303,7 @@ public class BinlogManager { TBinlogType type = TBinlogType.REPLACE_PARTITIONS; String data = info.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } // add Truncate Table @@ -316,7 +316,7 @@ public class BinlogManager { TruncateTableRecord record = new TruncateTableRecord(info); String data = record.toJson(); - addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } // get binlog by dbId, return first binlog.version > version @@ -355,6 +355,20 @@ public class BinlogManager { } } + // get the dropped partitions of the db. + public List<Long> getDroppedPartitions(long dbId) { + lock.readLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + return Lists.newArrayList(); + } + return dbBinlog.getDroppedPartitions(); + } finally { + lock.readLock().unlock(); + } + } + public List<BinlogTombstone> gc() { LOG.info("begin gc binlog"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 79e1adf20c9..b43805b06d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class DBBinlog { private static final Logger LOG = LogManager.getLogger(BinlogManager.class); @@ -58,6 +60,9 @@ public class DBBinlog { // need UpsertRecord to add timestamps for gc private List<Pair<Long, Long>> timestamps; + // The commit seq of the dropped partitions + private List<Pair<Long, Long>> droppedPartitions; + private List<TBinlog> tableDummyBinlogs; private BinlogConfigCache binlogConfigCache; @@ -73,6 +78,7 @@ public class DBBinlog { tableDummyBinlogs = Lists.newArrayList(); tableBinlogMap = Maps.newHashMap(); timestamps = Lists.newArrayList(); + droppedPartitions = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -110,6 +116,13 @@ public class DBBinlog { allBinlogs.add(binlog); binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getType() == TBinlogType.DROP_PARTITION) { + DropPartitionInfo info = DropPartitionInfo.fromJson(binlog.data); + if (info != null && info.getPartitionId() > 0) { + droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq())); + } + } + if (tableIds == null) { return; } @@ -139,7 +152,7 @@ public class DBBinlog { // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable // binlog - public void addBinlog(TBinlog binlog) { + public void addBinlog(TBinlog binlog, Object raw) { boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); List<Long> tableIds = binlog.getTableIds(); @@ -170,6 +183,13 @@ public class DBBinlog { break; } + if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { + long partitionId = ((DropPartitionInfo) raw).getPartitionId(); + if (partitionId > 0) { + droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + } + } + for (long tableId : tableIds) { TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable); if (tableBinlog != null) { @@ -205,6 +225,18 @@ public class DBBinlog { } } + // Get the dropped partitions of the db. + public List<Long> getDroppedPartitions() { + lock.readLock().lock(); + try { + return droppedPartitions.stream() + .map(v -> v.first) + .collect(Collectors.toList()); + } finally { + lock.readLock().unlock(); + } + } + public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); @@ -293,6 +325,7 @@ public class DBBinlog { return tombstone; } + // remove expired binlogs and dropped partitions, used in disable db binlog gc. private void removeExpiredMetaData(long largestExpiredCommitSeq) { lock.writeLock().lock(); try { @@ -321,6 +354,7 @@ public class DBBinlog { } } + gcDroppedPartitions(largestExpiredCommitSeq); if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } @@ -331,6 +365,8 @@ public class DBBinlog { } } + // Get last expired binlog, and gc expired binlogs/timestamps/dropped + // partitions, used in enable db binlog gc. private TBinlog getLastExpiredBinlog(BinlogComparator checker) { TBinlog lastExpiredBinlog = null; @@ -355,6 +391,8 @@ public class DBBinlog { while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) { timeIter.remove(); } + + gcDroppedPartitions(lastExpiredBinlog.getCommitSeq()); } return lastExpiredBinlog; @@ -464,6 +502,13 @@ public class DBBinlog { } } + private void gcDroppedPartitions(long commitSeq) { + Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator(); + while (iter.hasNext() && iter.next().second < commitSeq) { + iter.remove(); + } + } + // not thread safety, do this without lock public void getAllBinlogs(List<TBinlog> binlogs) { binlogs.addAll(tableDummyBinlogs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 36ec4f733ea..f3279b328c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -84,29 +84,28 @@ public class TableBinlog { public void recoverBinlog(TBinlog binlog) { TBinlog dummy = getDummyBinlog(); if (binlog.getCommitSeq() > dummy.getCommitSeq()) { - binlogs.add(binlog); - ++binlog.table_ref; - binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); - if (binlog.getTimestamp() > 0) { - timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); - } + addBinlogWithoutCheck(binlog); } } public void addBinlog(TBinlog binlog) { lock.writeLock().lock(); try { - binlogs.add(binlog); - ++binlog.table_ref; - binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); - if (binlog.getTimestamp() > 0) { - timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); - } + addBinlogWithoutCheck(binlog); } finally { lock.writeLock().unlock(); } } + private void addBinlogWithoutCheck(TBinlog binlog) { + binlogs.add(binlog); + ++binlog.table_ref; + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getTimestamp() > 0) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } + } + public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) { lock.readLock().lock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index dae44cdede1..ba877001cf8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5606,6 +5606,11 @@ public class Env { getTableMeta(olapTable, dbMeta); } + if (Config.enable_feature_binlog) { + BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); + dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId())); + } + result.setDbMeta(dbMeta); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 29a8d20321b..cfe94a1dded 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1760,11 +1760,11 @@ public class InternalCatalog implements CatalogIf<Database> { } // drop + Partition partition = null; long recycleTime = 0; if (isTempPartition) { olapTable.dropTempPartition(partitionName, true); } else { - Partition partition = null; if (!clause.isForceDrop()) { partition = olapTable.getPartition(partitionName); if (partition != null) { @@ -1785,8 +1785,9 @@ public class InternalCatalog implements CatalogIf<Database> { } // log - DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition, - clause.isForceDrop(), recycleTime); + long partitionId = partition == null ? -1L : partition.getId(); + DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionId, partitionName, + isTempPartition, clause.isForceDrop(), recycleTime); Env.getCurrentEnv().getEditLog().logDropPartition(info); LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java index f4fac915261..a7482f23d77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java @@ -32,6 +32,8 @@ public class DropPartitionInfo implements Writable { private Long dbId; @SerializedName(value = "tableId") private Long tableId; + @SerializedName(value = "pid") + private Long partitionId; @SerializedName(value = "partitionName") private String partitionName; @SerializedName(value = "isTempPartition") @@ -46,10 +48,11 @@ public class DropPartitionInfo implements Writable { private DropPartitionInfo() { } - public DropPartitionInfo(Long dbId, Long tableId, String partitionName, + public DropPartitionInfo(Long dbId, Long tableId, Long partitionId, String partitionName, boolean isTempPartition, boolean forceDrop, long recycleTime) { this.dbId = dbId; this.tableId = tableId; + this.partitionId = partitionId; this.partitionName = partitionName; this.isTempPartition = isTempPartition; this.forceDrop = forceDrop; @@ -75,6 +78,10 @@ public class DropPartitionInfo implements Writable { return tableId; } + public Long getPartitionId() { + return partitionId; + } + public String getPartitionName() { return partitionName; } @@ -91,13 +98,6 @@ public class DropPartitionInfo implements Writable { return recycleTime; } - @Deprecated - private void readFields(DataInput in) throws IOException { - dbId = in.readLong(); - tableId = in.readLong(); - partitionName = Text.readString(in); - } - public static DropPartitionInfo read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, DropPartitionInfo.class); @@ -109,6 +109,10 @@ public class DropPartitionInfo implements Writable { Text.writeString(out, json); } + public static DropPartitionInfo fromJson(String data) { + return GsonUtils.GSON.fromJson(data, DropPartitionInfo.class); + } + public String toJson() { return GsonUtils.GSON.toJson(this); } @@ -131,6 +135,7 @@ public class DropPartitionInfo implements Writable { return (dbId.equals(info.dbId)) && (tableId.equals(info.tableId)) + && (partitionId.equals(info.partitionId)) && (partitionName.equals(info.partitionName)) && (isTempPartition == info.isTempPartition) && (forceDrop == info.forceDrop) diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java index 4a94aacf60b..0713054b042 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java @@ -141,7 +141,7 @@ public class BinlogManagerTest { public void testGetBinlog() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { // reflect BinlogManager - Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class, Object.class); addBinlog.setAccessible(true); // init binlog manager & addBinlog @@ -154,7 +154,7 @@ public class BinlogManagerTest { if (i % 2 == 0) { binlog.setType(TBinlogType.CREATE_TABLE); } - addBinlog.invoke(manager, binlog); + addBinlog.invoke(manager, binlog, null); } @@ -197,7 +197,7 @@ public class BinlogManagerTest { IOException, NoSuchFieldException { // reflect BinlogManager // addBinlog method - Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class, Object.class); addBinlog.setAccessible(true); // dbBinlogMap Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); @@ -211,7 +211,7 @@ public class BinlogManagerTest { for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) { long dbId = dbEntry.getKey(); for (long tableId : dbEntry.getValue()) { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq), null); ++commitSeq; } } @@ -261,7 +261,7 @@ public class BinlogManagerTest { // reflect BinlogManager // addBinlog method - Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class, Object.class); addBinlog.setAccessible(true); // dbBinlogMap Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); @@ -276,8 +276,8 @@ public class BinlogManagerTest { for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) { long dbId = dbEntry.getKey(); for (long tableId : dbEntry.getValue()) { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow), null); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow), null); ++commitSeq; } } @@ -325,7 +325,7 @@ public class BinlogManagerTest { // reflect BinlogManager // addBinlog method - Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class, Object.class); addBinlog.setAccessible(true); // dbBinlogMap Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); @@ -341,8 +341,8 @@ public class BinlogManagerTest { long dbId = dbEntry.getKey(); for (long tableId : dbEntry.getValue()) { ++commitSeq; - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq), null); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq), null); } } timeNow = commitSeq; diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java index b57bde598e5..06230bfce56 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java @@ -105,7 +105,7 @@ public class DbBinlogTest { if (dbBinlog == null) { dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i)); } - dbBinlog.addBinlog(testBinlogs.get(i)); + dbBinlog.addBinlog(testBinlogs.get(i), null); } // trigger gc @@ -185,7 +185,7 @@ public class DbBinlogTest { if (dbBinlog == null) { dbBinlog = new DBBinlog(binlogConfigCache, binlog); } - dbBinlog.addBinlog(binlog); + dbBinlog.addBinlog(binlog, null); } // trigger gc @@ -236,7 +236,7 @@ public class DbBinlogTest { if (dbBinlog == null) { dbBinlog = new DBBinlog(binlogConfigCache, binlog); } - dbBinlog.addBinlog(binlog); + dbBinlog.addBinlog(binlog, null); } // trigger gc @@ -286,7 +286,7 @@ public class DbBinlogTest { binlog.setType(type); DBBinlog dbBinlog = new DBBinlog(new BinlogConfigCache(), binlog); - dbBinlog.addBinlog(binlog); + dbBinlog.addBinlog(binlog, null); TreeSet<TBinlog> allbinlogs = (TreeSet<TBinlog>) allBinlogsField.get(dbBinlog); Map<Long, TableBinlog> tableBinlogMap = (Map<Long, TableBinlog>) tableBinlogMapField.get(dbBinlog); diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DropPartitionInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DropPartitionInfoTest.java index 9adb0ecb60d..17befd82f53 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/DropPartitionInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DropPartitionInfoTest.java @@ -41,7 +41,7 @@ public class DropPartitionInfoTest { file.createNewFile(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); - DropPartitionInfo info1 = new DropPartitionInfo(1L, 2L, "test_partition", false, true, 0); + DropPartitionInfo info1 = new DropPartitionInfo(1L, 2L, 3L, "test_partition", false, true, 0); info1.write(dos); dos.flush(); @@ -54,18 +54,19 @@ public class DropPartitionInfoTest { Assert.assertEquals(Long.valueOf(1L), rInfo1.getDbId()); Assert.assertEquals(Long.valueOf(2L), rInfo1.getTableId()); + Assert.assertEquals(Long.valueOf(3L), rInfo1.getPartitionId()); Assert.assertEquals("test_partition", rInfo1.getPartitionName()); Assert.assertFalse(rInfo1.isTempPartition()); Assert.assertTrue(rInfo1.isForceDrop()); Assert.assertEquals(rInfo1, info1); Assert.assertNotEquals(rInfo1, this); - Assert.assertNotEquals(info1, new DropPartitionInfo(-1L, 2L, "test_partition", false, true, 0)); - Assert.assertNotEquals(info1, new DropPartitionInfo(1L, -2L, "test_partition", false, true, 0)); - Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, "test_partition1", false, true, 0)); - Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, "test_partition", true, true, 0)); - Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, "test_partition", false, false, 0)); - Assert.assertEquals(info1, new DropPartitionInfo(1L, 2L, "test_partition", false, true, 0)); + Assert.assertNotEquals(info1, new DropPartitionInfo(-1L, 2L, 3L, "test_partition", false, true, 0)); + Assert.assertNotEquals(info1, new DropPartitionInfo(1L, -2L, 3L, "test_partition", false, true, 0)); + Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, 3L, "test_partition1", false, true, 0)); + Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, 3L, "test_partition", true, true, 0)); + Assert.assertNotEquals(info1, new DropPartitionInfo(1L, 2L, 3L, "test_partition", false, false, 0)); + Assert.assertEquals(info1, new DropPartitionInfo(1L, 2L, 3L, "test_partition", false, true, 0)); // 3. delete files dis.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 692c20680ca..953878fcfc3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1223,6 +1223,7 @@ struct TGetMetaDBMeta { 1: optional i64 id 2: optional string name 3: optional list<TGetMetaTableMeta> tables + 4: optional list<i64> dropped_partitions } struct TGetMetaResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org