This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8cb6a  [Feature][Meta]Update/Read/Write VisibleVersionTime for 
Partition#4076 (#4086)
ed8cb6a is described below

commit ed8cb6a002642e43c0ebe3346a5f3bc486f3a0a2
Author: HaiBo Li <liha...@vip.126.com>
AuthorDate: Sun Jul 26 21:20:55 2020 +0800

    [Feature][Meta]Update/Read/Write VisibleVersionTime for Partition#4076 
(#4086)
    
    #4076
    1. The visibleVersionTime is updated when insert data to partition
    2. GlobalTransactionMgr call 
partition.updateVisibleVersionAndVersionHash(version, versionHash) when fe is 
restarted
    3. If fe restart, VisibleVersionTime may be changed, but the changed value 
is newer than the old value
---
 .../java/org/apache/doris/catalog/Partition.java   | 36 +++++++++++++--
 .../org/apache/doris/common/FeMetaVersion.java     |  4 +-
 .../doris/common/proc/PartitionsProcDir.java       |  4 +-
 .../doris/transaction/DatabaseTransactionMgr.java  |  7 +--
 .../doris/transaction/PartitionCommitInfo.java     | 54 +++++++++++++++-------
 .../apache/doris/transaction/TableCommitInfo.java  | 10 ++--
 6 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 0730bed..14fd6ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -92,6 +92,8 @@ public class Partition extends MetaObject implements Writable 
{
     private long committedVersionHash;
     @SerializedName(value = "visibleVersion")
     private long visibleVersion;
+    @SerializedName(value = "visibleVersionTime")
+    private long visibleVersionTime;
     @SerializedName(value = "visibleVersionHash")
     private long visibleVersionHash;
     @SerializedName(value = "nextVersion")
@@ -113,6 +115,7 @@ public class Partition extends MetaObject implements 
Writable {
         this.baseIndex = baseIndex;
 
         this.visibleVersion = PARTITION_INIT_VERSION;
+        this.visibleVersionTime = System.currentTimeMillis();
         this.visibleVersionHash = PARTITION_INIT_VERSION_HASH;
         // PARTITION_INIT_VERSION == 1, so the first load version is 2 !!!
         this.nextVersion = PARTITION_INIT_VERSION + 1;
@@ -147,8 +150,7 @@ public class Partition extends MetaObject implements 
Writable {
      * the restored partition version info》
      */
     public void updateVersionForRestore(long visibleVersion, long 
visibleVersionHash) {
-        this.visibleVersion = visibleVersion;
-        this.visibleVersionHash = visibleVersionHash;
+        this.setVisibleVersion(visibleVersion, visibleVersionHash);
         this.nextVersion = this.visibleVersion + 1;
         this.nextVersionHash = Util.generateVersionHash();
         this.committedVersionHash = visibleVersionHash;
@@ -157,8 +159,11 @@ public class Partition extends MetaObject implements 
Writable {
     }
 
     public void updateVisibleVersionAndVersionHash(long visibleVersion, long 
visibleVersionHash) {
-        this.visibleVersion = visibleVersion;
-        this.visibleVersionHash = visibleVersionHash;
+        updateVisibleVersionAndVersionHash(visibleVersion, 
System.currentTimeMillis(), visibleVersionHash);
+    }
+
+    public void updateVisibleVersionAndVersionHash(long visibleVersion, long 
visibleVersionTime, long visibleVersionHash) {
+        this.setVisibleVersion(visibleVersion, visibleVersionTime, 
visibleVersionHash);
         if (MetaContext.get() != null) {
             // MetaContext is not null means we are in a edit log replay 
thread.
             // if it is upgrade from old palo cluster, then should update next 
version info
@@ -181,9 +186,26 @@ public class Partition extends MetaObject implements 
Writable {
         return visibleVersion;
     }
 
+    public long getVisibleVersionTime() {
+        return visibleVersionTime;
+    }
+
     public long getVisibleVersionHash() {
         return visibleVersionHash;
     }
+    
+    // The method updateVisibleVersionAndVersionHash is called when fe 
restart, the visibleVersionTime is updated
+    private void setVisibleVersion(long visibleVersion, long 
visibleVersionHash) {
+        this.visibleVersion = visibleVersion;
+        this.visibleVersionTime = System.currentTimeMillis();
+        this.visibleVersionHash = visibleVersionHash;
+    }
+
+    public void setVisibleVersion(long visibleVersion, long 
visibleVersionTime, long visibleVersionHash) {
+        this.visibleVersion = visibleVersion;
+        this.visibleVersionTime = visibleVersionTime;
+        this.visibleVersionHash = visibleVersionHash;
+    }
 
     public PartitionState getState() {
         return this.state;
@@ -344,6 +366,7 @@ public class Partition extends MetaObject implements 
Writable {
         }
 
         out.writeLong(visibleVersion);
+        out.writeLong(visibleVersionTime);
         out.writeLong(visibleVersionHash);
 
         out.writeLong(nextVersion);
@@ -379,6 +402,11 @@ public class Partition extends MetaObject implements 
Writable {
         }
 
         visibleVersion = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_88) {
+            visibleVersionTime = in.readLong();
+        } else {
+            visibleVersionTime = System.currentTimeMillis();             
+        }
         visibleVersionHash = in.readLong();
         if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_45) {
             nextVersion = in.readLong();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 7c52c94..ec7205f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -185,6 +185,8 @@ public final class FeMetaVersion {
     public static final int VERSION_86 = 86;
     // spark resource, resource privilege, broker file group for hive table
     public static final int VERSION_87 = 87;
+    // add partition visibleVersionTime
+    public static final int VERSION_88 = 88;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_87;
+    public static final int VERSION_CURRENT = VERSION_88;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 5cd64cd..551407a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -64,7 +64,8 @@ import java.util.stream.Collectors;
  */
 public class PartitionsProcDir implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash")
+            .add("PartitionId").add("PartitionName")
+            
.add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash")
             
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
             
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
             .add("LastConsistencyCheckTime")
@@ -231,6 +232,7 @@ public class PartitionsProcDir implements ProcDirInterface {
                 partitionInfo.add(partitionId);
                 partitionInfo.add(partitionName);
                 partitionInfo.add(partition.getVisibleVersion());
+                
partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime()));
                 partitionInfo.add(partition.getVisibleVersionHash());
                 partitionInfo.add(partition.getState());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 2c40344..8c68dfa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -815,8 +815,8 @@ public class DatabaseTransactionMgr {
                 OlapTable table = (OlapTable) db.getTable(tableId);
                 Partition partition = table.getPartition(partitionId);
                 PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId,
-                        partition.getNextVersion(),
-                        partition.getNextVersionHash());
+                        partition.getNextVersion(), 
partition.getNextVersionHash(),
+                        System.currentTimeMillis() /* use as partition visible 
time */);
                 tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
             }
             transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@@ -1285,8 +1285,9 @@ public class DatabaseTransactionMgr {
                     }
                 } // end for indices
                 long version = partitionCommitInfo.getVersion();
+                long versionTime = partitionCommitInfo.getVersionTime();
                 long versionHash = partitionCommitInfo.getVersionHash();
-                partition.updateVisibleVersionAndVersionHash(version, 
versionHash);
+                partition.updateVisibleVersionAndVersionHash(version, 
versionTime, versionHash);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("transaction state {} set partition {}'s version 
to [{}] and version hash to [{}]",
                             transactionState, partition.getId(), version, 
versionHash);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
index c22a376..b88d3e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
@@ -17,40 +17,57 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.doris.common.io.Writable;
-
 public class PartitionCommitInfo implements Writable {
 
+    @SerializedName(value = "partitionId")
     private long partitionId;
+    @SerializedName(value = "version")
     private long version;
+    @SerializedName(value = "versionTime")
+    private long versionTime;
+    @SerializedName(value = "versionHash")
     private long versionHash;
 
     public PartitionCommitInfo() {
         
     }
 
-    public PartitionCommitInfo(long partitionId, long version, long 
versionHash) {
+    public PartitionCommitInfo(long partitionId, long version, long 
versionHash, long visibleTime) {
         super();
         this.partitionId = partitionId;
         this.version = version;
+        this.versionTime = visibleTime;
         this.versionHash = versionHash;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-        out.writeLong(partitionId);
-        out.writeLong(version);
-        out.writeLong(versionHash);
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
     }
 
-    public void readFields(DataInput in) throws IOException {
-        partitionId = in.readLong();
-        version = in.readLong();
-        versionHash = in.readLong();
+    public static PartitionCommitInfo read(DataInput in) throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_88) {
+            long partitionId = in.readLong();
+            long version = in.readLong();
+            long versionHash = in.readLong();
+            return new PartitionCommitInfo(partitionId, version, versionHash, 
System.currentTimeMillis());
+        } else {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class);
+        }
     }
 
     public long getPartitionId() {
@@ -60,6 +77,10 @@ public class PartitionCommitInfo implements Writable {
     public long getVersion() {
         return version;
     }
+    
+    public long getVersionTime() {
+        return versionTime;
+    }
 
     public long getVersionHash() {
         return versionHash;
@@ -67,12 +88,11 @@ public class PartitionCommitInfo implements Writable {
     
     @Override
     public String toString() {
-        StringBuffer strBuffer = new StringBuffer("partitionid=");
-        strBuffer.append(partitionId);
-        strBuffer.append(", version=");
-        strBuffer.append(version);
-        strBuffer.append(", versionHash=");
-        strBuffer.append(versionHash);
-        return strBuffer.toString();
+        StringBuilder sb = new StringBuilder("partitionid=");
+        sb.append(partitionId);
+        sb.append(", version=").append(version);
+        sb.append(", versionHash=").append(versionHash);
+        sb.append(", versionTime=").append(versionTime);
+        return sb.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
index 024ea74..e59206a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TableCommitInfo.java
@@ -17,14 +17,15 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Maps;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.doris.common.io.Writable;
-import com.google.common.collect.Maps;
-
 public class TableCommitInfo implements Writable {
     
     private long tableId;
@@ -60,8 +61,7 @@ public class TableCommitInfo implements Writable {
         if (hasPartitionInfo) {
             int elementNum = in.readInt();
             for (int i = 0; i < elementNum; ++i) {
-                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo();
-                partitionCommitInfo.readFields(in);
+                PartitionCommitInfo partitionCommitInfo = 
PartitionCommitInfo.read(in);
                 
idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(), 
partitionCommitInfo);
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to