This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new ed8cb6a [Feature][Meta]Update/Read/Write VisibleVersionTime for Partition#4076 (#4086) ed8cb6a is described below commit ed8cb6a002642e43c0ebe3346a5f3bc486f3a0a2 Author: HaiBo Li <liha...@vip.126.com> AuthorDate: Sun Jul 26 21:20:55 2020 +0800 [Feature][Meta]Update/Read/Write VisibleVersionTime for Partition#4076 (#4086) #4076 1. The visibleVersionTime is updated when insert data to partition 2. GlobalTransactionMgr call partition.updateVisibleVersionAndVersionHash(version, versionHash) when fe is restarted 3. If fe restart, VisibleVersionTime may be changed, but the changed value is newer than the old value --- .../java/org/apache/doris/catalog/Partition.java | 36 +++++++++++++-- .../org/apache/doris/common/FeMetaVersion.java | 4 +- .../doris/common/proc/PartitionsProcDir.java | 4 +- .../doris/transaction/DatabaseTransactionMgr.java | 7 +-- .../doris/transaction/PartitionCommitInfo.java | 54 +++++++++++++++------- .../apache/doris/transaction/TableCommitInfo.java | 10 ++-- 6 files changed, 84 insertions(+), 31 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 0730bed..14fd6ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -92,6 +92,8 @@ public class Partition extends MetaObject implements Writable { private long committedVersionHash; @SerializedName(value = "visibleVersion") private long visibleVersion; + @SerializedName(value = "visibleVersionTime") + private long visibleVersionTime; @SerializedName(value = "visibleVersionHash") private long visibleVersionHash; @SerializedName(value = "nextVersion") @@ -113,6 +115,7 @@ public class Partition extends MetaObject implements Writable { this.baseIndex = baseIndex; this.visibleVersion = PARTITION_INIT_VERSION; + this.visibleVersionTime = System.currentTimeMillis(); this.visibleVersionHash = PARTITION_INIT_VERSION_HASH; // PARTITION_INIT_VERSION == 1, so the first load version is 2 !!! this.nextVersion = PARTITION_INIT_VERSION + 1; @@ -147,8 +150,7 @@ public class Partition extends MetaObject implements Writable { * the restored partition version info》 */ public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) { - this.visibleVersion = visibleVersion; - this.visibleVersionHash = visibleVersionHash; + this.setVisibleVersion(visibleVersion, visibleVersionHash); this.nextVersion = this.visibleVersion + 1; this.nextVersionHash = Util.generateVersionHash(); this.committedVersionHash = visibleVersionHash; @@ -157,8 +159,11 @@ public class Partition extends MetaObject implements Writable { } public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) { - this.visibleVersion = visibleVersion; - this.visibleVersionHash = visibleVersionHash; + updateVisibleVersionAndVersionHash(visibleVersion, System.currentTimeMillis(), visibleVersionHash); + } + + public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionTime, long visibleVersionHash) { + this.setVisibleVersion(visibleVersion, visibleVersionTime, visibleVersionHash); if (MetaContext.get() != null) { // MetaContext is not null means we are in a edit log replay thread. // if it is upgrade from old palo cluster, then should update next version info @@ -181,9 +186,26 @@ public class Partition extends MetaObject implements Writable { return visibleVersion; } + public long getVisibleVersionTime() { + return visibleVersionTime; + } + public long getVisibleVersionHash() { return visibleVersionHash; } + + // The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated + private void setVisibleVersion(long visibleVersion, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionTime = System.currentTimeMillis(); + this.visibleVersionHash = visibleVersionHash; + } + + public void setVisibleVersion(long visibleVersion, long visibleVersionTime, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionTime = visibleVersionTime; + this.visibleVersionHash = visibleVersionHash; + } public PartitionState getState() { return this.state; @@ -344,6 +366,7 @@ public class Partition extends MetaObject implements Writable { } out.writeLong(visibleVersion); + out.writeLong(visibleVersionTime); out.writeLong(visibleVersionHash); out.writeLong(nextVersion); @@ -379,6 +402,11 @@ public class Partition extends MetaObject implements Writable { } visibleVersion = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_88) { + visibleVersionTime = in.readLong(); + } else { + visibleVersionTime = System.currentTimeMillis(); + } visibleVersionHash = in.readLong(); if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) { nextVersion = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 7c52c94..ec7205f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -185,6 +185,8 @@ public final class FeMetaVersion { public static final int VERSION_86 = 86; // spark resource, resource privilege, broker file group for hive table public static final int VERSION_87 = 87; + // add partition visibleVersionTime + public static final int VERSION_88 = 88; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_87; + public static final int VERSION_CURRENT = VERSION_88; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 5cd64cd..551407a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -64,7 +64,8 @@ import java.util.stream.Collectors; */ public class PartitionsProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash") + .add("PartitionId").add("PartitionName") + .add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash") .add("State").add("PartitionKey").add("Range").add("DistributionKey") .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime") .add("LastConsistencyCheckTime") @@ -231,6 +232,7 @@ public class PartitionsProcDir implements ProcDirInterface { partitionInfo.add(partitionId); partitionInfo.add(partitionName); partitionInfo.add(partition.getVisibleVersion()); + partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime())); partitionInfo.add(partition.getVisibleVersionHash()); partitionInfo.add(partition.getState()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 2c40344..8c68dfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -815,8 +815,8 @@ public class DatabaseTransactionMgr { OlapTable table = (OlapTable) db.getTable(tableId); Partition partition = table.getPartition(partitionId); PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId, - partition.getNextVersion(), - partition.getNextVersionHash()); + partition.getNextVersion(), partition.getNextVersionHash(), + System.currentTimeMillis() /* use as partition visible time */); tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); @@ -1285,8 +1285,9 @@ public class DatabaseTransactionMgr { } } // end for indices long version = partitionCommitInfo.getVersion(); + long versionTime = partitionCommitInfo.getVersionTime(); long versionHash = partitionCommitInfo.getVersionHash(); - partition.updateVisibleVersionAndVersionHash(version, versionHash); + partition.updateVisibleVersionAndVersionHash(version, versionTime, versionHash); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]", transactionState, partition.getId(), version, versionHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java index c22a376..b88d3e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java @@ -17,40 +17,57 @@ package org.apache.doris.transaction; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.doris.common.io.Writable; - public class PartitionCommitInfo implements Writable { + @SerializedName(value = "partitionId") private long partitionId; + @SerializedName(value = "version") private long version; + @SerializedName(value = "versionTime") + private long versionTime; + @SerializedName(value = "versionHash") private long versionHash; public PartitionCommitInfo() { } - public PartitionCommitInfo(long partitionId, long version, long versionHash) { + public PartitionCommitInfo(long partitionId, long version, long versionHash, long visibleTime) { super(); this.partitionId = partitionId; this.version = version; + this.versionTime = visibleTime; this.versionHash = versionHash; } @Override public void write(DataOutput out) throws IOException { - out.writeLong(partitionId); - out.writeLong(version); - out.writeLong(versionHash); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { - partitionId = in.readLong(); - version = in.readLong(); - versionHash = in.readLong(); + public static PartitionCommitInfo read(DataInput in) throws IOException { + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_88) { + long partitionId = in.readLong(); + long version = in.readLong(); + long versionHash = in.readLong(); + return new PartitionCommitInfo(partitionId, version, versionHash, System.currentTimeMillis()); + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class); + } } public long getPartitionId() { @@ -60,6 +77,10 @@ public class PartitionCommitInfo implements Writable { public long getVersion() { return version; } + + public long getVersionTime() { + return versionTime; + } public long getVersionHash() { return versionHash; @@ -67,12 +88,11 @@ public class PartitionCommitInfo implements Writable { @Override public String toString() { - StringBuffer strBuffer = new StringBuffer("partitionid="); - strBuffer.append(partitionId); - strBuffer.append(", version="); - strBuffer.append(version); - strBuffer.append(", versionHash="); - strBuffer.append(versionHash); - return strBuffer.toString(); + StringBuilder sb = new StringBuilder("partitionid="); + sb.append(partitionId); + sb.append(", version=").append(version); + sb.append(", versionHash=").append(versionHash); + sb.append(", versionTime=").append(versionTime); + return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java index 024ea74..e59206a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java @@ -17,14 +17,15 @@ package org.apache.doris.transaction; +import org.apache.doris.common.io.Writable; + +import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; -import org.apache.doris.common.io.Writable; -import com.google.common.collect.Maps; - public class TableCommitInfo implements Writable { private long tableId; @@ -60,8 +61,7 @@ public class TableCommitInfo implements Writable { if (hasPartitionInfo) { int elementNum = in.readInt(); for (int i = 0; i < elementNum; ++i) { - PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(); - partitionCommitInfo.readFields(in); + PartitionCommitInfo partitionCommitInfo = PartitionCommitInfo.read(in); idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(), partitionCommitInfo); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org