This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ed8cb6a [Feature][Meta]Update/Read/Write VisibleVersionTime for
Partition#4076 (#4086)
ed8cb6a is described below
commit ed8cb6a002642e43c0ebe3346a5f3bc486f3a0a2
Author: HaiBo Li <[email protected]>
AuthorDate: Sun Jul 26 21:20:55 2020 +0800
[Feature][Meta]Update/Read/Write VisibleVersionTime for Partition#4076
(#4086)
#4076
1. The visibleVersionTime is updated when insert data to partition
2. GlobalTransactionMgr call
partition.updateVisibleVersionAndVersionHash(version, versionHash) when fe is
restarted
3. If fe restart, VisibleVersionTime may be changed, but the changed value
is newer than the old value
---
.../java/org/apache/doris/catalog/Partition.java | 36 +++++++++++++--
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../doris/common/proc/PartitionsProcDir.java | 4 +-
.../doris/transaction/DatabaseTransactionMgr.java | 7 +--
.../doris/transaction/PartitionCommitInfo.java | 54 +++++++++++++++-------
.../apache/doris/transaction/TableCommitInfo.java | 10 ++--
6 files changed, 84 insertions(+), 31 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 0730bed..14fd6ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -92,6 +92,8 @@ public class Partition extends MetaObject implements Writable
{
private long committedVersionHash;
@SerializedName(value = "visibleVersion")
private long visibleVersion;
+ @SerializedName(value = "visibleVersionTime")
+ private long visibleVersionTime;
@SerializedName(value = "visibleVersionHash")
private long visibleVersionHash;
@SerializedName(value = "nextVersion")
@@ -113,6 +115,7 @@ public class Partition extends MetaObject implements
Writable {
this.baseIndex = baseIndex;
this.visibleVersion = PARTITION_INIT_VERSION;
+ this.visibleVersionTime = System.currentTimeMillis();
this.visibleVersionHash = PARTITION_INIT_VERSION_HASH;
// PARTITION_INIT_VERSION == 1, so the first load version is 2 !!!
this.nextVersion = PARTITION_INIT_VERSION + 1;
@@ -147,8 +150,7 @@ public class Partition extends MetaObject implements
Writable {
* the restored partition version info》
*/
public void updateVersionForRestore(long visibleVersion, long
visibleVersionHash) {
- this.visibleVersion = visibleVersion;
- this.visibleVersionHash = visibleVersionHash;
+ this.setVisibleVersion(visibleVersion, visibleVersionHash);
this.nextVersion = this.visibleVersion + 1;
this.nextVersionHash = Util.generateVersionHash();
this.committedVersionHash = visibleVersionHash;
@@ -157,8 +159,11 @@ public class Partition extends MetaObject implements
Writable {
}
public void updateVisibleVersionAndVersionHash(long visibleVersion, long
visibleVersionHash) {
- this.visibleVersion = visibleVersion;
- this.visibleVersionHash = visibleVersionHash;
+ updateVisibleVersionAndVersionHash(visibleVersion,
System.currentTimeMillis(), visibleVersionHash);
+ }
+
+ public void updateVisibleVersionAndVersionHash(long visibleVersion, long
visibleVersionTime, long visibleVersionHash) {
+ this.setVisibleVersion(visibleVersion, visibleVersionTime,
visibleVersionHash);
if (MetaContext.get() != null) {
// MetaContext is not null means we are in a edit log replay
thread.
// if it is upgrade from old palo cluster, then should update next
version info
@@ -181,9 +186,26 @@ public class Partition extends MetaObject implements
Writable {
return visibleVersion;
}
+ public long getVisibleVersionTime() {
+ return visibleVersionTime;
+ }
+
public long getVisibleVersionHash() {
return visibleVersionHash;
}
+
+ // The method updateVisibleVersionAndVersionHash is called when fe
restart, the visibleVersionTime is updated
+ private void setVisibleVersion(long visibleVersion, long
visibleVersionHash) {
+ this.visibleVersion = visibleVersion;
+ this.visibleVersionTime = System.currentTimeMillis();
+ this.visibleVersionHash = visibleVersionHash;
+ }
+
+ public void setVisibleVersion(long visibleVersion, long
visibleVersionTime, long visibleVersionHash) {
+ this.visibleVersion = visibleVersion;
+ this.visibleVersionTime = visibleVersionTime;
+ this.visibleVersionHash = visibleVersionHash;
+ }
public PartitionState getState() {
return this.state;
@@ -344,6 +366,7 @@ public class Partition extends MetaObject implements
Writable {
}
out.writeLong(visibleVersion);
+ out.writeLong(visibleVersionTime);
out.writeLong(visibleVersionHash);
out.writeLong(nextVersion);
@@ -379,6 +402,11 @@ public class Partition extends MetaObject implements
Writable {
}
visibleVersion = in.readLong();
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_88) {
+ visibleVersionTime = in.readLong();
+ } else {
+ visibleVersionTime = System.currentTimeMillis();
+ }
visibleVersionHash = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_45) {
nextVersion = in.readLong();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 7c52c94..ec7205f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -185,6 +185,8 @@ public final class FeMetaVersion {
public static final int VERSION_86 = 86;
// spark resource, resource privilege, broker file group for hive table
public static final int VERSION_87 = 87;
+ // add partition visibleVersionTime
+ public static final int VERSION_88 = 88;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_87;
+ public static final int VERSION_CURRENT = VERSION_88;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 5cd64cd..551407a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -64,7 +64,8 @@ import java.util.stream.Collectors;
*/
public class PartitionsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash")
+ .add("PartitionId").add("PartitionName")
+
.add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash")
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
.add("LastConsistencyCheckTime")
@@ -231,6 +232,7 @@ public class PartitionsProcDir implements ProcDirInterface {
partitionInfo.add(partitionId);
partitionInfo.add(partitionName);
partitionInfo.add(partition.getVisibleVersion());
+
partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime()));
partitionInfo.add(partition.getVisibleVersionHash());
partitionInfo.add(partition.getState());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 2c40344..8c68dfa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -815,8 +815,8 @@ public class DatabaseTransactionMgr {
OlapTable table = (OlapTable) db.getTable(tableId);
Partition partition = table.getPartition(partitionId);
PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId,
- partition.getNextVersion(),
- partition.getNextVersionHash());
+ partition.getNextVersion(),
partition.getNextVersionHash(),
+ System.currentTimeMillis() /* use as partition visible
time */);
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@@ -1285,8 +1285,9 @@ public class DatabaseTransactionMgr {
}
} // end for indices
long version = partitionCommitInfo.getVersion();
+ long versionTime = partitionCommitInfo.getVersionTime();
long versionHash = partitionCommitInfo.getVersionHash();
- partition.updateVisibleVersionAndVersionHash(version,
versionHash);
+ partition.updateVisibleVersionAndVersionHash(version,
versionTime, versionHash);
if (LOG.isDebugEnabled()) {
LOG.debug("transaction state {} set partition {}'s version
to [{}] and version hash to [{}]",
transactionState, partition.getId(), version,
versionHash);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
index c22a376..b88d3e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
@@ -17,40 +17,57 @@
package org.apache.doris.transaction;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.doris.common.io.Writable;
-
public class PartitionCommitInfo implements Writable {
+ @SerializedName(value = "partitionId")
private long partitionId;
+ @SerializedName(value = "version")
private long version;
+ @SerializedName(value = "versionTime")
+ private long versionTime;
+ @SerializedName(value = "versionHash")
private long versionHash;
public PartitionCommitInfo() {
}
- public PartitionCommitInfo(long partitionId, long version, long
versionHash) {
+ public PartitionCommitInfo(long partitionId, long version, long
versionHash, long visibleTime) {
super();
this.partitionId = partitionId;
this.version = version;
+ this.versionTime = visibleTime;
this.versionHash = versionHash;
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeLong(partitionId);
- out.writeLong(version);
- out.writeLong(versionHash);
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
}
- public void readFields(DataInput in) throws IOException {
- partitionId = in.readLong();
- version = in.readLong();
- versionHash = in.readLong();
+ public static PartitionCommitInfo read(DataInput in) throws IOException {
+ if (Catalog.getCurrentCatalogJournalVersion() <
FeMetaVersion.VERSION_88) {
+ long partitionId = in.readLong();
+ long version = in.readLong();
+ long versionHash = in.readLong();
+ return new PartitionCommitInfo(partitionId, version, versionHash,
System.currentTimeMillis());
+ } else {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class);
+ }
}
public long getPartitionId() {
@@ -60,6 +77,10 @@ public class PartitionCommitInfo implements Writable {
public long getVersion() {
return version;
}
+
+ public long getVersionTime() {
+ return versionTime;
+ }
public long getVersionHash() {
return versionHash;
@@ -67,12 +88,11 @@ public class PartitionCommitInfo implements Writable {
@Override
public String toString() {
- StringBuffer strBuffer = new StringBuffer("partitionid=");
- strBuffer.append(partitionId);
- strBuffer.append(", version=");
- strBuffer.append(version);
- strBuffer.append(", versionHash=");
- strBuffer.append(versionHash);
- return strBuffer.toString();
+ StringBuilder sb = new StringBuilder("partitionid=");
+ sb.append(partitionId);
+ sb.append(", version=").append(version);
+ sb.append(", versionHash=").append(versionHash);
+ sb.append(", versionTime=").append(versionTime);
+ return sb.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
index 024ea74..e59206a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
@@ -17,14 +17,15 @@
package org.apache.doris.transaction;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Maps;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
-import org.apache.doris.common.io.Writable;
-import com.google.common.collect.Maps;
-
public class TableCommitInfo implements Writable {
private long tableId;
@@ -60,8 +61,7 @@ public class TableCommitInfo implements Writable {
if (hasPartitionInfo) {
int elementNum = in.readInt();
for (int i = 0; i < elementNum; ++i) {
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo();
- partitionCommitInfo.readFields(in);
+ PartitionCommitInfo partitionCommitInfo =
PartitionCommitInfo.read(in);
idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(),
partitionCommitInfo);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]