This is an automated email from the ASF dual-hosted git repository.

liuhangyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 30df9fc  Serialize origin stmt in Rollup Job and MV Meta (#3705)
30df9fc is described below

commit 30df9fcae96419b6a9674792d6bef19d77d68dee
Author: EmmyMiao87 <522274...@qq.com>
AuthorDate: Sat May 30 20:17:46 2020 +0800

    Serialize origin stmt in Rollup Job and MV Meta (#3705)
    
    * Serialize origin stmt in Rollup Job and MV Meta
    
    In materialized view 2.0, the define expr is serialized in column.
    The method is that doris serialzie the origin stmt of Create Materialzied 
View Stmt in RollupJobV2 and MVMeta.
    The define expr will be extract from the origin stmt after meta is 
deserialized.
    
    The define expr is necessary for bitmap and hll materialized view.
    For example:
    MV meta: __doris_mv_bitmap_k1, bitmap_union, to_bitmap(k1)
    Origin stmt: select bitmap_union(to_bitmap(k1)) from table
    Deserialize meta: __doris_mv_bitmap_k1, bitmap_union, null
    After extract: the define expr `to_bitmap(k1)` from origin stmt should be 
extracted.
                   __doris_mv_bitmap_v1, bitmap_union, to_bitmap(k1) (which 
comes from the origin stmt)
    
    Change-Id: Ic2da093188d8985f5e97be5bd094e5d60d82c9a7
    
    * Add comment of read method
    
    Change-Id: I4e1e0f4ad0f6e76cdc43e49938de768ec3b0a0e8
    
    * Fix ut
    
    Change-Id: I2be257d512bf541f00912a374a2e07a039fc42b4
    
    * Change code style
    
    Change-Id: I3ab23f5c94ae781167f498fefde2d96e42e05bf9
---
 fe/src/main/java/org/apache/doris/alter/Alter.java |   6 +-
 .../java/org/apache/doris/alter/AlterJobV2.java    |  36 +++-
 .../doris/alter/MaterializedViewHandler.java       |  12 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   | 206 ++++++++++++---------
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 167 +++--------------
 .../doris/analysis/CreateMaterializedViewStmt.java |   8 +-
 .../org/apache/doris/analysis/StatementBase.java   |  15 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   3 +-
 .../main/java/org/apache/doris/catalog/Column.java |  40 ++--
 .../doris/catalog/MaterializedIndexMeta.java       |  48 ++++-
 .../java/org/apache/doris/catalog/OlapTable.java   |  11 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   4 +-
 .../org/apache/doris/load/loadv2/LoadManager.java  |   5 +-
 .../doris/load/routineload/RoutineLoadManager.java |   5 +-
 .../doris/persist/gson/GsonPostProcessable.java    |   4 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   |  12 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   3 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   7 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  10 +-
 .../org/apache/doris/alter/RollupJobV2Test.java    |  50 ++++-
 .../apache/doris/alter/SchemaChangeJobV2Test.java  |  39 ++++
 .../java/org/apache/doris/catalog/ColumnTest.java  |   3 +-
 .../doris/catalog/MaterializedIndexMetaTest.java   |  53 +++++-
 .../apache/doris/catalog/TempPartitionTest.java    |   4 -
 .../doris/load/loadv2/BrokerLoadJobTest.java       |  13 +-
 .../load/routineload/RoutineLoadManagerTest.java   |   6 +-
 .../doris/persist/gson/ThriftToJsonTest.java}      |  17 +-
 .../java/org/apache/doris/qe/StmtExecutorTest.java |   6 +-
 30 files changed, 458 insertions(+), 341 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 4678d50..9711065 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -80,7 +80,8 @@ public class Alter {
         clusterHandler.start();
     }
 
-    public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) 
throws DdlException, AnalysisException {
+    public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
+            throws DdlException, AnalysisException {
         String tableName = stmt.getBaseIndexName();
         // check db
         String dbName = stmt.getDBName();
@@ -102,7 +103,8 @@ public class Alter {
             OlapTable olapTable = (OlapTable) table;
             olapTable.checkStableAndNormal(db.getClusterName());
 
-            
((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt,
 db, olapTable);
+            
((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt,
 db,
+                    olapTable);
         } finally {
             db.writeUnlock();
         }
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
index 62c61cb..3a51472 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -22,10 +22,13 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -59,17 +62,27 @@ public abstract class AlterJobV2 implements Writable {
         ROLLUP, SCHEMA_CHANGE
     }
 
+    @SerializedName(value = "type")
     protected JobType type;
+    @SerializedName(value = "jobId")
     protected long jobId;
+    @SerializedName(value = "jobState")
     protected JobState jobState;
 
+    @SerializedName(value = "dbId")
     protected long dbId;
+    @SerializedName(value = "tableId")
     protected long tableId;
+    @SerializedName(value = "tableName")
     protected String tableName;
 
+    @SerializedName(value = "errMsg")
     protected String errMsg = "";
+    @SerializedName(value = "createTimeMs")
     protected long createTimeMs = -1;
+    @SerializedName(value = "finishedTimeMs")
     protected long finishedTimeMs = -1;
+    @SerializedName(value = "timeoutMs")
     protected long timeoutMs = -1;
 
     public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, 
String tableName, long timeoutMs) {
@@ -220,15 +233,20 @@ public abstract class AlterJobV2 implements Writable {
     public abstract void replay(AlterJobV2 replayedJob);
 
     public static AlterJobV2 read(DataInput in) throws IOException {
-        JobType type = JobType.valueOf(Text.readString(in));
-        switch (type) {
-            case ROLLUP:
-                return RollupJobV2.read(in);
-            case SCHEMA_CHANGE:
-                return SchemaChangeJobV2.read(in);
-            default:
-                Preconditions.checkState(false);
-                return null;
+        if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_86) {
+            JobType type = JobType.valueOf(Text.readString(in));
+            switch (type) {
+                case ROLLUP:
+                    return RollupJobV2.read(in);
+                case SCHEMA_CHANGE:
+                    return SchemaChangeJobV2.read(in);
+                default:
+                    Preconditions.checkState(false);
+                    return null;
+            }
+        } else {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
         }
     }
 
diff --git 
a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 921350e..1d3fb3a 100644
--- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -57,6 +57,7 @@ import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 
@@ -89,7 +90,6 @@ import java.util.concurrent.ConcurrentHashMap;
 public class MaterializedViewHandler extends AlterHandler {
     private static final Logger LOG = 
LogManager.getLogger(MaterializedViewHandler.class);
     public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_";
-    public static final String MATERIALIZED_VIEW_NAME_PRFIX = 
"__doris_materialized_view_";
 
     public MaterializedViewHandler() {
         super("materialized view");
@@ -200,7 +200,7 @@ public class MaterializedViewHandler extends AlterHandler {
 
         // Step2: create mv job
         RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, 
baseIndexName, mvColumns, addMVClause
-                .getProperties(), olapTable, db, baseIndexId, 
addMVClause.getMVKeysType());
+                .getProperties(), olapTable, db, baseIndexId, 
addMVClause.getMVKeysType(), addMVClause.getOrigStmt());
 
         addAlterJobV2(rollupJobV2);
 
@@ -264,7 +264,7 @@ public class MaterializedViewHandler extends AlterHandler {
 
                 // step 3 create rollup job
                 RollupJobV2 alterJobV2 = 
createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, 
addRollupClause.getProperties(),
-                        olapTable, db, baseIndexId, olapTable.getKeysType());
+                        olapTable, db, baseIndexId, olapTable.getKeysType(), 
null);
 
                 rollupNameJobMap.put(addRollupClause.getRollupName(), 
alterJobV2);
                 logJobIdSet.add(alterJobV2.getJobId());
@@ -313,7 +313,7 @@ public class MaterializedViewHandler extends AlterHandler {
      */
     private RollupJobV2 createMaterializedViewJob(String mvName, String 
baseIndexName,
             List<Column> mvColumns, Map<String, String> properties, OlapTable
-            olapTable, Database db, long baseIndexId, KeysType mvKeysType)
+            olapTable, Database db, long baseIndexId, KeysType mvKeysType, 
OriginStatement origStmt)
             throws DdlException, AnalysisException {
         if (mvKeysType == null) {
             // assign rollup index's key type, same as base index's
@@ -337,7 +337,7 @@ public class MaterializedViewHandler extends AlterHandler {
         RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, 
olapTable.getName(), timeoutMs,
                                             baseIndexId, mvIndexId, 
baseIndexName, mvName,
                                             mvColumns, baseSchemaHash, 
mvSchemaHash,
-                                            mvKeysType, mvShortKeyColumnCount);
+                                            mvKeysType, mvShortKeyColumnCount, 
origStmt);
         String newStorageFormatIndexName = 
NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
         if (mvName.equals(newStorageFormatIndexName)) {
             mvJob.setStorageFormat(TStorageFormat.V2);
@@ -465,10 +465,8 @@ public class MaterializedViewHandler extends AlterHandler {
             if (mvColumnItem.getDefineExpr() != null) {
                 if (mvAggregationType.equals(AggregateType.BITMAP_UNION)) {
                     newMVColumn.setType(Type.BITMAP);
-                    newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + 
"bitmap_" + baseColumn.getName());
                 } else if (mvAggregationType.equals(AggregateType.HLL_UNION)){
                     newMVColumn.setType(Type.HLL);
-                    newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "hll_" 
+ baseColumn.getName());
                 } else {
                     throw new DdlException("The define expr of column is only 
support bitmap_union or hll_union");
                 }
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index aac6d09..e856a76 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -36,7 +40,12 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -53,6 +62,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -60,6 +70,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.StringReader;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -70,30 +81,45 @@ import java.util.concurrent.TimeUnit;
  * This is for replacing the old RollupJob
  * https://github.com/apache/incubator-doris/issues/1429
  */
-public class RollupJobV2 extends AlterJobV2 {
+public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
     private static final Logger LOG = LogManager.getLogger(RollupJobV2.class);
 
     // partition id -> (rollup tablet id -> base tablet id)
+    @SerializedName(value = "partitionIdToBaseRollupTabletIdMap")
     private Map<Long, Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = 
Maps.newHashMap();
+    @SerializedName(value = "partitionIdToRollupIndex")
     private Map<Long, MaterializedIndex> partitionIdToRollupIndex = 
Maps.newHashMap();
 
     // rollup and base schema info
+    @SerializedName(value = "baseIndexId")
     private long baseIndexId;
+    @SerializedName(value = "rollupIndexId")
     private long rollupIndexId;
+    @SerializedName(value = "baseIndexName")
     private String baseIndexName;
+    @SerializedName(value =  "rollupIndexName")
     private String rollupIndexName;
 
+    @SerializedName(value = "rollupSchema")
     private List<Column> rollupSchema = Lists.newArrayList();
+    @SerializedName(value = "baseSchemaHash")
     private int baseSchemaHash;
+    @SerializedName(value = "rollupSchemaHash")
     private int rollupSchemaHash;
 
+    @SerializedName(value = "rollupKeysType")
     private KeysType rollupKeysType;
+    @SerializedName(value = "rollupShortKeyColumnCount")
     private short rollupShortKeyColumnCount;
+    @SerializedName(value = "origStmt")
+    private OriginStatement origStmt;
 
     // optional
+    @SerializedName(value = "storageFormat")
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
     // The rollup job will wait all transactions before this txn id finished, 
then send the rollup tasks.
+    @SerializedName(value = "watershedTxnId")
     protected long watershedTxnId = -1;
 
     // save all create rollup tasks
@@ -101,8 +127,8 @@ public class RollupJobV2 extends AlterJobV2 {
 
     public RollupJobV2(long jobId, long dbId, long tableId, String tableName, 
long timeoutMs,
             long baseIndexId, long rollupIndexId, String baseIndexName, String 
rollupIndexName,
-            List<Column> rollupSchema, int baseSchemaHash, int 
rollupSchemaHash,
-            KeysType rollupKeysType, short rollupShortKeyColumnCount) {
+            List<Column> rollupSchema, int baseSchemaHash, int 
rollupSchemaHash, KeysType rollupKeysType,
+            short rollupShortKeyColumnCount, OriginStatement origStmt) {
         super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
 
         this.baseIndexId = baseIndexId;
@@ -115,6 +141,8 @@ public class RollupJobV2 extends AlterJobV2 {
         this.rollupSchemaHash = rollupSchemaHash;
         this.rollupKeysType = rollupKeysType;
         this.rollupShortKeyColumnCount = rollupShortKeyColumnCount;
+
+        this.origStmt = origStmt;
     }
 
     private RollupJobV2() {
@@ -278,7 +306,7 @@ public class RollupJobV2 extends AlterJobV2 {
         }
 
         tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* 
init schema version */,
-                rollupSchemaHash, 
rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType);
+                rollupSchemaHash, 
rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType, origStmt);
     }
 
     /**
@@ -506,90 +534,6 @@ public class RollupJobV2 extends AlterJobV2 {
         return 
Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId,
 dbId, Lists.newArrayList(tableId));
     }
 
-    public static RollupJobV2 read(DataInput in) throws IOException {
-        RollupJobV2 rollupJob = new RollupJobV2();
-        rollupJob.readFields(in);
-        return rollupJob;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
-
-        out.writeInt(partitionIdToRollupIndex.size());
-        for (long partitionId : partitionIdToRollupIndex.keySet()) {
-            out.writeLong(partitionId);
-
-            
out.writeInt(partitionIdToBaseRollupTabletIdMap.get(partitionId).size());
-            for (Map.Entry<Long, Long> entry : 
partitionIdToBaseRollupTabletIdMap.get(partitionId).entrySet()) {
-                out.writeLong(entry.getKey());
-                out.writeLong(entry.getValue());
-            }
-
-            MaterializedIndex rollupIndex = 
partitionIdToRollupIndex.get(partitionId);
-            rollupIndex.write(out);
-        }
-
-        out.writeLong(baseIndexId);
-        out.writeLong(rollupIndexId);
-        Text.writeString(out, baseIndexName);
-        Text.writeString(out, rollupIndexName);
-
-        // rollup schema
-        out.writeInt(rollupSchema.size());
-        for (Column column : rollupSchema) {
-            column.write(out);
-        }
-        out.writeInt(baseSchemaHash);
-        out.writeInt(rollupSchemaHash);
-
-        Text.writeString(out, rollupKeysType.name());
-        out.writeShort(rollupShortKeyColumnCount);
-
-        out.writeLong(watershedTxnId);
-        Text.writeString(out, storageFormat.name());
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-
-        int size = in.readInt();
-        for (int i = 0; i < size; i++) {
-            long partitionId = in.readLong();
-            int size2 = in.readInt();
-            Map<Long, Long> tabletIdMap = 
partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> 
Maps.newHashMap());
-            for (int j = 0; j < size2; j++) {
-                long rollupTabletId = in.readLong();
-                long baseTabletId = in.readLong();
-                tabletIdMap.put(rollupTabletId, baseTabletId);
-            }
-
-            partitionIdToRollupIndex.put(partitionId, 
MaterializedIndex.read(in));
-        }
-
-        baseIndexId = in.readLong();
-        rollupIndexId = in.readLong();
-        baseIndexName = Text.readString(in);
-        rollupIndexName = Text.readString(in);
-
-        size = in.readInt();
-        for (int i = 0; i < size; i++) {
-            Column column = Column.read(in);
-            rollupSchema.add(column);
-        }
-        baseSchemaHash = in.readInt();
-        rollupSchemaHash = in.readInt();
-
-        rollupKeysType = KeysType.valueOf(Text.readString(in));
-        rollupShortKeyColumnCount = in.readShort();
-
-        watershedTxnId = in.readLong();
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_85) {
-            storageFormat = TStorageFormat.valueOf(Text.readString(in));
-        }
-    }
-
     /**
      * Replay job in PENDING state.
      * Should replay all changes before this job's state transfer to PENDING.
@@ -773,4 +717,90 @@ public class RollupJobV2 extends AlterJobV2 {
         this.jobState = jobState;
     }
 
+    private void setColumnsDefineExpr(List<MVColumnItem> items) {
+        for (MVColumnItem item : items) {
+            for (Column column : rollupSchema) {
+                if (column.getName().equals(item.getName())) {
+                    column.setDefineExpr(item.getDefineExpr());
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
+        Text.writeString(out, json);
+    }
+
+    /**
+     * This method is only used to deserialize the text mate which version is 
less then 86.
+     * If the meta version >=86, it will be deserialized by the `read` of 
AlterJobV2 rather then here.
+     */
+    public static RollupJobV2 read(DataInput in) throws IOException {
+        Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_86);
+        RollupJobV2 rollupJob = new RollupJobV2();
+        rollupJob.readFields(in);
+        return rollupJob;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long partitionId = in.readLong();
+            int size2 = in.readInt();
+            Map<Long, Long> tabletIdMap = 
partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> 
Maps.newHashMap());
+            for (int j = 0; j < size2; j++) {
+                long rollupTabletId = in.readLong();
+                long baseTabletId = in.readLong();
+                tabletIdMap.put(rollupTabletId, baseTabletId);
+            }
+
+            partitionIdToRollupIndex.put(partitionId, 
MaterializedIndex.read(in));
+        }
+
+        baseIndexId = in.readLong();
+        rollupIndexId = in.readLong();
+        baseIndexName = Text.readString(in);
+        rollupIndexName = Text.readString(in);
+
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            Column column = Column.read(in);
+            rollupSchema.add(column);
+        }
+        baseSchemaHash = in.readInt();
+        rollupSchemaHash = in.readInt();
+
+        rollupKeysType = KeysType.valueOf(Text.readString(in));
+        rollupShortKeyColumnCount = in.readShort();
+
+        watershedTxnId = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_85) {
+            storageFormat = TStorageFormat.valueOf(Text.readString(in));
+        }
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        // analyze define stmt
+        if (origStmt == null) {
+            return;
+        }
+        // parse the define stmt to schema
+        SqlParser parser = new SqlParser(new SqlScanner(new 
StringReader(origStmt.originStmt),
+                SqlModeHelper.MODE_DEFAULT));
+        CreateMaterializedViewStmt stmt;
+        try {
+            stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 
origStmt.idx);
+            stmt.analyzeSelectClause();
+            setColumnsDefineExpr(stmt.getMVColumnItemList());
+        } catch (Exception e) {
+            throw new IOException("error happens when parsing create 
materialized view stmt: " + origStmt, e);
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 71236ec..08838eb 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -57,8 +58,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.common.collect.Table.Cell;
+import com.google.gson.annotations.SerializedName;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -81,35 +82,44 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     private static final Logger LOG = 
LogManager.getLogger(SchemaChangeJobV2.class);
 
     // partition id -> (shadow index id -> (shadow tablet id -> origin tablet 
id))
+    @SerializedName(value = "partitionIndexTabletMap")
     private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = 
HashBasedTable.create();
     // partition id -> (shadow index id -> shadow index))
     private Table<Long, Long, MaterializedIndex> partitionIndexMap = 
HashBasedTable.create();
     // shadow index id -> origin index id
+    @SerializedName(value = "indexIdMap")
     private Map<Long, Long> indexIdMap = Maps.newHashMap();
     // shadow index id -> shadow index name(__doris_shadow_xxx)
+    @SerializedName(value = "indexIdToName")
     private Map<Long, String> indexIdToName = Maps.newHashMap();
     // shadow index id -> index schema
+    @SerializedName(value = "indexSchemaMap")
     private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
     // shadow index id -> (shadow index schema version : schema hash)
+    @SerializedName(value = "indexSchemaVersionAndHashMap")
     private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap = 
Maps.newHashMap();
     // shadow index id -> shadow index short key count
+    @SerializedName(value = "indexShortKeyMap")
     private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
 
-    // identify whether the job is finished and no need to persist some data
-    private boolean isMetaPruned = false;
-
     // bloom filter info
+    @SerializedName(value = "hasBfChange")
     private boolean hasBfChange;
+    @SerializedName(value = "bfColumns")
     private Set<String> bfColumns = null;
+    @SerializedName(value = "bfFpp")
     private double bfFpp = 0;
 
     // alter index info
+    @SerializedName(value = "indexChange")
     private boolean indexChange = false;
+    @SerializedName(value = "indexes")
     private List<Index> indexes = null;
 
     // The schema change job will wait all transactions before this txn id 
finished, then send the schema change tasks.
+    @SerializedName(value = "watershedTxnId")
     protected long watershedTxnId = -1;
-
+    @SerializedName(value = "storageFormat")
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
     // save all schema change tasks
@@ -170,7 +180,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         partitionIndexMap.clear();
         indexSchemaMap.clear();
         indexShortKeyMap.clear();
-        isMetaPruned = true;
     }
 
     /**
@@ -642,12 +651,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         return 
Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId,
 dbId, Lists.newArrayList(tableId));
     }
 
-    public static SchemaChangeJobV2 read(DataInput in) throws IOException {
-        SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2();
-        schemaChangeJob.readFields(in);
-        return schemaChangeJob;
-    }
-
     /**
      * Replay job in PENDING state.
      * Should replay all changes before this job's state transfer to PENDING.
@@ -827,80 +830,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     }
 
     /**
-     * write data need to persist when job not finish
-     */
-    private void writeJobNotFinishData(DataOutput out) throws IOException {
-        out.writeInt(partitionIndexTabletMap.rowKeySet().size());
-        for (Long partitionId : partitionIndexTabletMap.rowKeySet()) {
-            out.writeLong(partitionId);
-            Map<Long, Map<Long, Long>> indexTabletMap = 
partitionIndexTabletMap.row(partitionId);
-            out.writeInt(indexTabletMap.size());
-            for (Long shadowIndexId : indexTabletMap.keySet()) {
-                out.writeLong(shadowIndexId);
-                // tablet id map
-                Map<Long, Long> tabletMap = indexTabletMap.get(shadowIndexId);
-                out.writeInt(tabletMap.size());
-                for (Map.Entry<Long, Long> entry : tabletMap.entrySet()) {
-                    out.writeLong(entry.getKey());
-                    out.writeLong(entry.getValue());
-                }
-                // shadow index
-                MaterializedIndex shadowIndex = 
partitionIndexMap.get(partitionId, shadowIndexId);
-                shadowIndex.write(out);
-            }
-        }
-
-        // shadow index info
-        out.writeInt(indexIdMap.size());
-        for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
-            long shadowIndexId = entry.getKey();
-            out.writeLong(shadowIndexId);
-            // index id map
-            out.writeLong(entry.getValue());
-            // index name
-            Text.writeString(out, indexIdToName.get(shadowIndexId));
-            // index schema
-            out.writeInt(indexSchemaMap.get(shadowIndexId).size());
-            for (Column column : indexSchemaMap.get(shadowIndexId)) {
-                column.write(out);
-            }
-            // index schema version and hash
-            
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
-            
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
-            // short key count
-            out.writeShort(indexShortKeyMap.get(shadowIndexId));
-        }
-
-        // bloom filter
-        out.writeBoolean(hasBfChange);
-        if (hasBfChange) {
-            out.writeInt(bfColumns.size());
-            for (String bfCol : bfColumns) {
-                Text.writeString(out, bfCol);
-            }
-            out.writeDouble(bfFpp);
-        }
-
-        out.writeLong(watershedTxnId);
-
-        // index
-        out.writeBoolean(indexChange);
-        if (indexChange) {
-            if (CollectionUtils.isNotEmpty(indexes)) {
-                out.writeBoolean(true);
-                out.writeInt(indexes.size());
-                for (Index index : indexes) {
-                    index.write(out);
-                }
-            } else {
-                out.writeBoolean(false);
-            }
-        }
-
-        Text.writeString(out, storageFormat.name());
-    }
-
-    /**
      * read data need to persist when job not finish
      */
     private void readJobNotFinishData(DataInput in) throws IOException {
@@ -983,53 +912,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     }
 
     /**
-     * write data need to persist when job finished
-     */
-    private void writeJobFinishedData(DataOutput out) throws IOException {
-        // only persist data will be used in getInfo
-        out.writeInt(indexIdMap.size());
-        for (Entry<Long, Long> entry : indexIdMap.entrySet()) {
-            long shadowIndexId = entry.getKey();
-            out.writeLong(shadowIndexId);
-            // index id map
-            out.writeLong(entry.getValue());
-            // index name
-            Text.writeString(out, indexIdToName.get(shadowIndexId));
-            // index schema version and hash
-            
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
-            
out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
-        }
-
-        // bloom filter
-        out.writeBoolean(hasBfChange);
-        if (hasBfChange) {
-            out.writeInt(bfColumns.size());
-            for (String bfCol : bfColumns) {
-                Text.writeString(out, bfCol);
-            }
-            out.writeDouble(bfFpp);
-        }
-
-        out.writeLong(watershedTxnId);
-
-        // index
-        out.writeBoolean(indexChange);
-        if (indexChange) {
-            if (CollectionUtils.isNotEmpty(indexes)) {
-                out.writeBoolean(true);
-                out.writeInt(indexes.size());
-                for (Index index : indexes) {
-                    index.write(out);
-                }
-            } else {
-                out.writeBoolean(false);
-            }
-        }
-
-        Text.writeString(out, storageFormat.name());
-    }
-
-    /**
      * read data need to persist when job finished
      */
     private void readJobFinishedData(DataInput in) throws IOException {
@@ -1082,14 +964,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        super.write(out);
+        String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
+        Text.writeString(out, json);
+    }
 
-        out.writeBoolean(isMetaPruned);
-        if (isMetaPruned) {
-            writeJobFinishedData(out);
-        } else {
-            writeJobNotFinishData(out);
-        }
+    /**
+     * This method is only used to deserialize the text mate which version is 
less then 86.
+     * If the meta version >=86, it will be deserialized by the `read` of 
AlterJobV2 rather then here.
+     */
+    public static SchemaChangeJobV2 read(DataInput in) throws IOException {
+        Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_86);
+        SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2();
+        schemaChangeJob.readFields(in);
+        return schemaChangeJob;
     }
 
     @Override
diff --git 
a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java
index 5690be3..56ec28d 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java
@@ -49,6 +49,8 @@ import java.util.Set;
  * [PROPERTIES ("key" = "value")]
  */
 public class CreateMaterializedViewStmt extends DdlStmt {
+    public static final String MATERIALIZED_VIEW_NAME_PRFIX = 
"__doris_materialized_view_";
+
     private String mvName;
     private SelectStmt selectStmt;
     private Map<String, String> properties;
@@ -100,9 +102,8 @@ public class CreateMaterializedViewStmt extends DdlStmt {
 
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
-        // TODO(ml): remove it
         if (!Config.enable_materialized_view) {
-            throw new AnalysisException("The materialized view is coming 
soon");
+            throw new AnalysisException("The materialized view is disabled");
         }
         super.analyze(analyzer);
         FeNameFormat.checkTableName(mvName);
@@ -128,7 +129,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
         }
     }
 
-    private void analyzeSelectClause() throws AnalysisException {
+    public void analyzeSelectClause() throws AnalysisException {
         SelectList selectList = selectStmt.getSelectList();
         if (selectList.getItems().isEmpty()) {
             throw new AnalysisException("The materialized view must contain at 
least one column");
@@ -200,6 +201,7 @@ public class CreateMaterializedViewStmt extends DdlStmt {
                     beginIndexOfAggregation = i;
                 }
                 // TODO(ml): support different type of column, int -> 
bigint(sum)
+                // TODO: change the column name of bitmap and hll
                 MVColumnItem mvColumnItem = new MVColumnItem(columnName);
                 
mvColumnItem.setAggregationType(AggregateType.valueOf(functionName.toUpperCase()),
 false);
                 mvColumnItem.setDefineExpr(defineExpr);
diff --git a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java 
b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java
index 7410028..3fd62aa 100644
--- a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.rewrite.ExprRewriter;
 
 import com.google.common.base.Preconditions;
@@ -46,6 +47,8 @@ public abstract class StatementBase implements ParseNode {
     // END: Members that need to be reset()
     /////////////////////////////////////////
 
+    private OriginStatement origStmt;
+
     protected StatementBase() { }
 
     /**
@@ -153,7 +156,17 @@ public abstract class StatementBase implements ParseNode {
 
     public void setClusterName(String clusterName) {
         this.clusterName = clusterName;
-    } 
+    }
+
+    public void setOrigStmt(OriginStatement origStmt) {
+        Preconditions.checkState(origStmt != null);
+        this.origStmt = origStmt;
+    }
+
+    public OriginStatement getOrigStmt() {
+        return origStmt;
+    }
+
     /**
      * Resets the internal analysis state of this node.
      * For easier maintenance, class members that need to be reset are grouped 
into
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 4c44409..cf95e6d 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4903,7 +4903,8 @@ public class Catalog {
         this.alter.processAlterView(stmt, ConnectContext.get());
     }
 
-    public void createMaterializedView(CreateMaterializedViewStmt stmt) throws 
AnalysisException, DdlException {
+    public void createMaterializedView(CreateMaterializedViewStmt stmt)
+            throws AnalysisException, DdlException {
         this.alter.processCreateMaterializedView(stmt);
     }
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/src/main/java/org/apache/doris/catalog/Column.java
index d1ce8a6..461cf6a 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Column.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TColumnType;
 
@@ -67,7 +68,7 @@ public class Column implements Writable {
     private String comment;
     @SerializedName(value = "stats")
     private ColumnStats stats;     // cardinality and selectivity etc.
-    private Expr defineExpr; //use to define materialize view
+    private Expr defineExpr; // use to define column in materialize view
 
     public Column() {
         this.name = "";
@@ -411,31 +412,11 @@ public class Column implements Writable {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        Text.writeString(out, name);
-        ColumnType.write(out, type);
-        if (null == aggregationType) {
-            out.writeBoolean(false);
-        } else {
-            out.writeBoolean(true);
-            Text.writeString(out, aggregationType.name());
-            out.writeBoolean(isAggregationTypeImplicit);
-        }
-
-        out.writeBoolean(isKey);
-        out.writeBoolean(isAllowNull);
-
-        if (defaultValue == null) {
-            out.writeBoolean(false);
-        } else {
-            out.writeBoolean(true);
-            Text.writeString(out, defaultValue);
-        }
-        stats.write(out);
-
-        Text.writeString(out, comment);
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
     }
 
-    public void readFields(DataInput in) throws IOException {
+    private void readFields(DataInput in) throws IOException {
         name = Text.readString(in);
         type = ColumnType.read(in);
         boolean notNull = in.readBoolean();
@@ -475,8 +456,13 @@ public class Column implements Writable {
     }
 
     public static Column read(DataInput in) throws IOException {
-        Column column = new Column();
-        column.readFields(in);
-        return column;
+        if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_86) {
+            Column column = new Column();
+            column.readFields(in);
+            return column;
+        } else {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, Column.class);
+        }
     }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java 
b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
index 25a8bb5..2e224f9 100644
--- a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
+++ b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java
@@ -17,9 +17,17 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.thrift.TStorageType;
 
 import com.google.common.base.Preconditions;
@@ -29,9 +37,10 @@ import com.google.gson.annotations.SerializedName;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.StringReader;
 import java.util.List;
 
-public class MaterializedIndexMeta implements Writable {
+public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
     @SerializedName(value = "indexId")
     private long indexId;
     @SerializedName(value = "schema")
@@ -46,9 +55,11 @@ public class MaterializedIndexMeta implements Writable {
     private TStorageType storageType;
     @SerializedName(value = "keysType")
     private KeysType keysType;
+    @SerializedName(value = "defineStmt")
+    private OriginStatement defineStmt;
 
-    public MaterializedIndexMeta(long indexId, List<Column> schema, int 
schemaVersion, int
-            schemaHash, short shortKeyColumnCount, TStorageType storageType, 
KeysType keysType) {
+    public MaterializedIndexMeta(long indexId, List<Column> schema, int 
schemaVersion, int schemaHash,
+            short shortKeyColumnCount, TStorageType storageType, KeysType 
keysType, OriginStatement defineStmt) {
         this.indexId = indexId;
         Preconditions.checkState(schema != null);
         Preconditions.checkState(schema.size() != 0);
@@ -60,6 +71,7 @@ public class MaterializedIndexMeta implements Writable {
         this.storageType = storageType;
         Preconditions.checkState(keysType != null);
         this.keysType = keysType;
+        this.defineStmt = defineStmt;
     }
 
     public long getIndexId() {
@@ -94,6 +106,17 @@ public class MaterializedIndexMeta implements Writable {
         return schemaVersion;
     }
 
+    private void setColumnsDefineExpr(List<MVColumnItem> items) {
+        for (MVColumnItem item : items) {
+            for (Column column : schema) {
+                if (column.getName().equals(item.getName())) {
+                    column.setDefineExpr(item.getDefineExpr());
+                    break;
+                }
+            }
+        }
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (!(obj instanceof MaterializedIndexMeta)) {
@@ -134,4 +157,23 @@ public class MaterializedIndexMeta implements Writable {
         return GsonUtils.GSON.fromJson(json, MaterializedIndexMeta.class);
     }
 
+    @Override
+    public void gsonPostProcess() throws IOException {
+        // analyze define stmt
+        if (defineStmt == null) {
+            return;
+        }
+        // parse the define stmt to schema
+        SqlParser parser = new SqlParser(new SqlScanner(new 
StringReader(defineStmt.originStmt),
+                SqlModeHelper.MODE_DEFAULT));
+        CreateMaterializedViewStmt stmt;
+        try {
+            stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 
defineStmt.idx);
+            stmt.analyzeSelectClause();
+            setColumnsDefineExpr(stmt.getMVColumnItemList());
+        } catch (Exception e) {
+            throw new IOException("error happens when parsing create 
materialized view stmt: " + defineStmt, e);
+        }
+    }
+
 }
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index f307a5f..b4d60f6 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TOlapTable;
 import org.apache.doris.thrift.TStorageFormat;
@@ -246,6 +247,12 @@ public class OlapTable extends Table {
 
     public void setIndexMeta(long indexId, String indexName, List<Column> 
schema, int schemaVersion, int schemaHash,
             short shortKeyColumnCount, TStorageType storageType, KeysType 
keysType) {
+        setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, 
shortKeyColumnCount, storageType, keysType,
+                null);
+    }
+
+    public void setIndexMeta(long indexId, String indexName, List<Column> 
schema, int schemaVersion, int schemaHash,
+            short shortKeyColumnCount, TStorageType storageType, KeysType 
keysType, OriginStatement origStmt) {
         // Nullable when meta comes from schema change log replay.
         // The replay log only save the index id, so we need to get name by id.
         if (indexName == null) {
@@ -268,7 +275,7 @@ public class OlapTable extends Table {
         }
 
         MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, 
schema, schemaVersion,
-                schemaHash, shortKeyColumnCount, storageType, keysType);
+                schemaHash, shortKeyColumnCount, storageType, keysType, 
origStmt);
         indexIdToMeta.put(indexId, indexMeta);
         indexNameToId.put(indexName, indexId);
     }
@@ -970,7 +977,7 @@ public class OlapTable extends Table {
 
                 // The keys type in here is incorrect
                 MaterializedIndexMeta indexMeta = new 
MaterializedIndexMeta(indexId, schema,
-                        schemaVersion, schemaHash, shortKeyColumnCount, 
storageType, KeysType.AGG_KEYS);
+                        schemaVersion, schemaHash, shortKeyColumnCount, 
storageType, KeysType.AGG_KEYS, null);
                 tmpIndexMetaList.add(indexMeta);
             } else {
                 MaterializedIndexMeta indexMeta = 
MaterializedIndexMeta.read(in);
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java 
b/fe/src/main/java/org/apache/doris/common/Config.java
index 5cdf4f6..c29921f 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -1060,7 +1060,7 @@ public class Config extends ConfigBase {
      * control materialized view
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static boolean enable_materialized_view = false;
+    public static boolean enable_materialized_view = true;
 
     /**
      * it can't auto-resume routine load job as long as one of the backends is 
down
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index f4d1f8c..dabd7a8 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -181,6 +181,8 @@ public final class FeMetaVersion {
     public static final int VERSION_84 = 84;
     // add storage format in rollup job
     public static final int VERSION_85 = 85;
+    // serialize origStmt in rollupJob and mv meta
+    public static final int VERSION_86 = 86;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_85;
+    public static final int VERSION_CURRENT = VERSION_86;
 }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index c4d7d2c..8d817f0 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -124,7 +124,7 @@ public class BrokerLoadJob extends LoadJob {
         }
     }
 
-    public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, OriginStatement 
originStmt) throws DdlException {
+    public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws 
DdlException {
         // get db id
         String dbName = stmt.getLabel().getDbName();
         Database db = 
Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
@@ -135,7 +135,7 @@ public class BrokerLoadJob extends LoadJob {
         // create job
         try {
             BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), 
stmt.getLabel().getLabelName(),
-                    stmt.getBrokerDesc(), originStmt);
+                    stmt.getBrokerDesc(), stmt.getOrigStmt());
             brokerLoadJob.setJobProperties(stmt.getProperties());
             brokerLoadJob.checkAndSetDataSourceInfo(db, 
stmt.getDataDescriptions());
             return brokerLoadJob;
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index e123e41..e0cb91a 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -39,7 +39,6 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.Load;
-import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TMiniLoadBeginRequest;
 import org.apache.doris.thrift.TMiniLoadRequest;
@@ -98,7 +97,7 @@ public class LoadManager implements Writable{
      * @param stmt
      * @throws DdlException
      */
-    public void createLoadJobFromStmt(LoadStmt stmt, OriginStatement 
originStmt) throws DdlException {
+    public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
         Database database = checkDb(stmt.getLabel().getDbName());
         long dbId = database.getId();
         LoadJob loadJob = null;
@@ -112,7 +111,7 @@ public class LoadManager implements Writable{
                 throw new DdlException("There are more then " + 
Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
                                                + "please retry later.");
             }
-            loadJob = BrokerLoadJob.fromLoadStmt(stmt, originStmt);
+            loadJob = BrokerLoadJob.fromLoadStmt(stmt);
             createLoadJob(loadJob);
         } finally {
             writeUnlock();
diff --git 
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java 
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 0efb6fe..061d7fb 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -37,7 +37,6 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.OriginStatement;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -117,7 +116,7 @@ public class RoutineLoadManager implements Writable {
 
     }
 
-    public void createRoutineLoadJob(CreateRoutineLoadStmt 
createRoutineLoadStmt, OriginStatement origStmt)
+    public void createRoutineLoadJob(CreateRoutineLoadStmt 
createRoutineLoadStmt)
             throws UserException {
         // check load auth
         if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
@@ -141,7 +140,7 @@ public class RoutineLoadManager implements Writable {
                 throw new UserException("Unknown data source type: " + type);
         }
 
-        routineLoadJob.setOrigStmt(origStmt);
+        routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
         addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName());
     }
 
diff --git 
a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java 
b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java
index c09f2ad..6733d79 100644
--- a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java
+++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.persist.gson;
 
+import java.io.IOException;
+
 public interface GsonPostProcessable {
-    public void gsonPostProcess();
+    public void gsonPostProcess() throws IOException;
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 033ab03..10e1ad7 100644
--- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.persist.gson;
 
+import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.alter.RollupJobV2;
+import org.apache.doris.alter.SchemaChangeJobV2;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.HashDistributionInfo;
@@ -93,6 +96,12 @@ public class GsonUtils {
             .of(Resource.class, "clazz")
             .registerSubtype(SparkResource.class, 
SparkResource.class.getSimpleName());
 
+    // runtime adapter for class "AlterJobV2"
+    private static RuntimeTypeAdapterFactory<AlterJobV2> 
alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory
+            .of(AlterJobV2.class, "clazz")
+            .registerSubtype(RollupJobV2.class, 
RollupJobV2.class.getSimpleName())
+            .registerSubtype(SchemaChangeJobV2.class, 
SchemaChangeJobV2.class.getSimpleName());
+
     // the builder of GSON instance.
     // Add any other adapters if necessary.
     private static final GsonBuilder GSON_BUILDER = new GsonBuilder()
@@ -103,7 +112,8 @@ public class GsonUtils {
             .registerTypeAdapterFactory(new PostProcessTypeAdapterFactory())
             .registerTypeAdapterFactory(columnTypeAdapterFactory)
             .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
-            .registerTypeAdapterFactory(resourceTypeAdapterFactory);
+            .registerTypeAdapterFactory(resourceTypeAdapterFactory)
+            .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory);
 
     // this instance is thread-safe.
     public static final Gson GSON = GSON_BUILDER.create();
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index d94733a..83ef187 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -178,7 +178,8 @@ public class ConnectProcessor {
                     ctx.resetRetureRows();
                 }
                 parsedStmt = stmts.get(i);
-                executor = new StmtExecutor(ctx, parsedStmt, new 
OriginStatement(originStmt, i));
+                parsedStmt.setOrigStmt(new OriginStatement(originStmt, i));
+                executor = new StmtExecutor(ctx, parsedStmt);
                 executor.execute();
 
                 if (i != stmts.size() - 1) {
diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 284daf6..db1a0e5 100644
--- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -82,8 +82,7 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.Load;
 
 public class DdlExecutor {
-    public static void execute(Catalog catalog, DdlStmt ddlStmt, 
OriginStatement origStmt)
-            throws DdlException, QueryStateException, Exception {
+    public static void execute(Catalog catalog, DdlStmt ddlStmt) throws 
Exception {
         if (ddlStmt instanceof CreateClusterStmt) {
             CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
             catalog.createCluster(stmt);
@@ -132,7 +131,7 @@ public class DdlExecutor {
             if (loadStmt.getVersion().equals(Load.VERSION) || jobType == 
EtlJobType.HADOOP) {
                 catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, 
jobType, System.currentTimeMillis());
             } else {
-                catalog.getLoadManager().createLoadJobFromStmt(loadStmt, 
origStmt);
+                catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
             }
         } else if (ddlStmt instanceof CancelLoadStmt) {
             if (catalog.getLoadInstance().isLabelExist(
@@ -142,7 +141,7 @@ public class DdlExecutor {
                 catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) 
ddlStmt);
             }
         } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
-            
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) 
ddlStmt, origStmt);
+            
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) 
ddlStmt);
         } else if (ddlStmt instanceof PauseRoutineLoadStmt) {
             
catalog.getRoutineLoadManager().pauseRoutineLoadJob((PauseRoutineLoadStmt) 
ddlStmt);
         } else if (ddlStmt instanceof ResumeRoutineLoadStmt) {
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index b5ccb65..114f9cf 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -128,12 +128,12 @@ public class StmtExecutor {
     }
 
     // constructor for receiving parsed stmt from connect processor
-    public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt, 
OriginStatement originStmt) {
+    public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) {
         this.context = ctx;
-        this.originStmt = originStmt;
+        this.parsedStmt = parsedStmt;
+        this.originStmt = parsedStmt.getOrigStmt();
         this.serializer = context.getSerializer();
         this.isProxy = false;
-        this.parsedStmt = parsedStmt;
     }
 
     // At the end of query execution, we begin to add up profile
@@ -376,7 +376,7 @@ public class StmtExecutor {
             SqlParser parser = new SqlParser(input);
             try {
                 parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx);
-
+                parsedStmt.setOrigStmt(originStmt);
             } catch (Error e) {
                 LOG.info("error happened when parsing stmt {}, id: {}", 
originStmt, context.getStmtId(), e);
                 throw new AnalysisException("sql parsing error, please check 
your sql");
@@ -889,7 +889,7 @@ public class StmtExecutor {
 
     private void handleDdlStmt() {
         try {
-            DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt, 
originStmt);
+            DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt);
             context.getState().setOk();
         } catch (QueryStateException e) {
             context.setState(e.getQueryState());
diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index 35429d0..e3fd856 100644
--- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -24,6 +24,13 @@ import org.apache.doris.analysis.AccessTestUtil;
 import org.apache.doris.analysis.AddRollupClause;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.FunctionName;
+import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
@@ -34,7 +41,6 @@ import org.apache.doris.catalog.FakeEditLog;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
-import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
@@ -48,6 +54,7 @@ import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.meta.MetaContext;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.thrift.TStorageFormat;
@@ -60,7 +67,6 @@ import com.google.common.collect.Lists;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.DataInputStream;
@@ -367,15 +373,21 @@ public class RollupJobV2Test {
 
 
     @Test
-    public void testSerializeOfRollupJob() throws IOException {
+    public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt 
stmt) throws IOException {
         // prepare file
         File file = new File(fileName);
         file.createNewFile();
         DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
         
         short keysCount = 1;
-        RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, 
"test", "rollup",Lists.newArrayList(), 1, 1,
-                KeysType.AGG_KEYS, keysCount);
+        List<Column> columns = Lists.newArrayList();
+        String mvColumnName 
=CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "c1";
+        Column column = new Column(mvColumnName, Type.BITMAP, false, 
AggregateType.BITMAP_UNION, false, "1", "");
+        columns.add(column);
+        RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, 
"test", "rollup", columns, 1, 1,
+                KeysType.AGG_KEYS, keysCount,
+                new OriginStatement("create materialized view rollup as select 
bitmap_union(to_bitmap(c1)) from test",
+                        0));
         rollupJobV2.setStorageFormat(TStorageFormat.V2);
 
         // write rollup job
@@ -383,15 +395,37 @@ public class RollupJobV2Test {
         out.flush();
         out.close();
 
+        List<MVColumnItem> itemList = Lists.newArrayList();
+        MVColumnItem item = new MVColumnItem(
+                mvColumnName);
+        List<Expr> params = Lists.newArrayList();
+        SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1");
+        params.add(param1);
+        item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), 
params));
+        itemList.add(item);
+        new Expectations() {
+            {
+                stmt.getMVColumnItemList();
+                result = itemList;
+            }
+        };
+
         // read objects from file
         MetaContext metaContext = new MetaContext();
-        metaContext.setMetaVersion(FeMetaVersion.VERSION_85);
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_86);
         metaContext.setThreadLocalInfo();
-        DataInputStream in = new DataInputStream(new FileInputStream(file));
 
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
         RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in);
-        Catalog.getCurrentCatalogJournalVersion();
         Assert.assertEquals(TStorageFormat.V2, 
Deencapsulation.getField(result, "storageFormat"));
+        List<Column> resultColumns = Deencapsulation.getField(result, 
"rollupSchema");
+        Assert.assertEquals(1, resultColumns.size());
+        Column resultColumn1 = resultColumns.get(0);
+        Assert.assertEquals(mvColumnName,
+                resultColumn1.getName());
+        Assert.assertTrue(resultColumn1.getDefineExpr() instanceof 
FunctionCallExpr);
+        FunctionCallExpr resultFunctionCall = (FunctionCallExpr) 
resultColumn1.getDefineExpr();
+        Assert.assertEquals("to_bitmap", 
resultFunctionCall.getFnName().getFunction());
 
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 91a745b..8697028 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -53,9 +53,11 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.FakeTransactionIDGenerator;
 import org.apache.doris.transaction.GlobalTransactionMgr;
@@ -66,6 +68,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -74,6 +82,8 @@ import java.util.Map;
 
 public class SchemaChangeJobV2Test {
 
+    private static String fileName = "./SchemaChangeV2Test";
+
     private static FakeEditLog fakeEditLog;
     private static FakeCatalog fakeCatalog;
     private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
@@ -372,4 +382,33 @@ public class SchemaChangeJobV2Test {
         
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, 
"p", DynamicPartitionProperty.ENABLE);
         
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, 
"30", DynamicPartitionProperty.ENABLE);
     }
+
+    @Test
+    public void testSerializeOfSchemaChangeJob() throws IOException {
+        // prepare file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
+
+        SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, 
"test",600000);
+        schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
+        Deencapsulation.setField(schemaChangeJobV2, "jobState", 
AlterJobV2.JobState.FINISHED);
+
+
+        // write schema change job
+        schemaChangeJobV2.write(out);
+        out.flush();
+        out.close();
+
+        // read objects from file
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_86);
+        metaContext.setThreadLocalInfo();
+
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        SchemaChangeJobV2 result = (SchemaChangeJobV2) AlterJobV2.read(in);
+        Assert.assertEquals(1, result.getJobId());
+        Assert.assertEquals(AlterJobV2.JobState.FINISHED, 
result.getJobState());
+        Assert.assertEquals(TStorageFormat.V2, 
Deencapsulation.getField(result, "storageFormat"));
+    }
 }
diff --git a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java 
b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java
index 6f31055..04a46f9 100644
--- a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java
@@ -74,8 +74,7 @@ public class ColumnTest {
         
         // 2. Read objects from file
         DataInputStream dis = new DataInputStream(new FileInputStream(file));
-        Column rColumn1 = new Column();
-        rColumn1.readFields(dis);
+        Column rColumn1 = Column.read(dis);
         Assert.assertEquals("user", rColumn1.getName());
         Assert.assertEquals(PrimitiveType.CHAR, rColumn1.getDataType());
         Assert.assertEquals(AggregateType.SUM, rColumn1.getAggregationType());
diff --git 
a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java 
b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java
index 9d8af1a..f661e62 100644
--- a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java
@@ -17,6 +17,14 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.FunctionName;
+import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.thrift.TStorageType;
 
 import com.google.common.collect.Lists;
@@ -33,6 +41,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
+import mockit.Expectations;
+import mockit.Mocked;
+
 public class MaterializedIndexMetaTest {
 
     private static String fileName = "./MaterializedIndexMetaSerializeTest";
@@ -44,12 +55,13 @@ public class MaterializedIndexMetaTest {
     }
 
     @Test
-    public void testSerializeMaterializedIndexMeta() throws IOException {
+    public void testSerializeMaterializedIndexMeta(@Mocked 
CreateMaterializedViewStmt stmt) throws IOException {
         // 1. Write objects to file
         File file = new File(fileName);
         file.createNewFile();
         DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
 
+        String mvColumnName = 
CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "k1";
         List<Column> schema = Lists.newArrayList();
         schema.add(new Column("k1", Type.TINYINT, true, null, true, "1", 
"abc"));
         schema.add(new Column("k2", Type.SMALLINT, true, null, true, "1", 
"debug"));
@@ -63,19 +75,48 @@ public class MaterializedIndexMetaTest {
         schema.add(new Column("k10", Type.VARCHAR, true, null, true, "1", ""));
         schema.add(new Column("k11", Type.DECIMALV2, true, null, true, "1", 
""));
         schema.add(new Column("k12", Type.INT, true, null, true, "1", ""));
-        schema.add(new Column("v1", Type.INT, true, AggregateType.SUM, true, 
"1", ""));
-        schema.add(new Column("v1", Type.VARCHAR, true, AggregateType.REPLACE, 
true, "1", ""));
+        schema.add(new Column("v1", Type.INT, false, AggregateType.SUM, true, 
"1", ""));
+        schema.add(new Column(mvColumnName, Type.BITMAP, false, 
AggregateType.BITMAP_UNION, false, "1", ""));
         short shortKeyColumnCount = 1;
         MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, schema, 
1, 1, shortKeyColumnCount,
-                TStorageType.COLUMN, KeysType.DUP_KEYS);
+                TStorageType.COLUMN, KeysType.DUP_KEYS, new OriginStatement(
+                "create materialized view test as select k1, k2, k3, k4, k5, 
k6, k7, k8, k9, k10, k11, k12, sum(v1), "
+                        + "bitmap_union(to_bitmap(k1)) from test group by k1, 
k2, k3, k4, k5, "
+                        + "k6, k7, k8, k9, k10, k11, k12",
+                0));
         indexMeta.write(out);
         out.flush();
         out.close();
 
+        List<MVColumnItem> itemList = Lists.newArrayList();
+        MVColumnItem item = new MVColumnItem(mvColumnName);
+        List<Expr> params = Lists.newArrayList();
+        SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1");
+        params.add(param1);
+        item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), 
params));
+        itemList.add(item);
+        new Expectations() {
+            {
+                stmt.getMVColumnItemList();
+                result = itemList;
+            }
+        };
+
+
         // 2. Read objects from file
         DataInputStream in = new DataInputStream(new FileInputStream(file));
-
         MaterializedIndexMeta readIndexMeta = MaterializedIndexMeta.read(in);
-        Assert.assertEquals(indexMeta, readIndexMeta);
+        Assert.assertEquals(1, readIndexMeta.getIndexId());
+        List<Column> resultColumns = readIndexMeta.getSchema();
+        for (Column column : resultColumns) {
+            if (column.getName().equals(mvColumnName)) {
+                Assert.assertTrue(column.getDefineExpr() instanceof 
FunctionCallExpr);
+                Assert.assertEquals(Type.BITMAP, column.getType());
+                Assert.assertEquals(AggregateType.BITMAP_UNION, 
column.getAggregationType());
+                Assert.assertEquals("to_bitmap", ((FunctionCallExpr) 
column.getDefineExpr()).getFnName().getFunction());
+            } else {
+                Assert.assertEquals(null, column.getDefineExpr());
+            }
+        }
     }
 }
diff --git a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java 
b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java
index 849848d..b39b3c0 100644
--- a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java
@@ -560,10 +560,6 @@ public class TempPartitionTest {
     }
     
     private void testSerializeOlapTable(OlapTable tbl) throws IOException, 
AnalysisException {
-        MetaContext metaContext = new MetaContext();
-        metaContext.setMetaVersion(FeMetaVersion.VERSION_77);
-        metaContext.setThreadLocalInfo();
-
         // 1. Write objects to file
         File file = new File(tempPartitionFile);
         file.createNewFile();
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 9c7192a..7991343 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -71,14 +71,10 @@ public class BrokerLoadJobTest {
                                  @Injectable LabelName labelName,
                                  @Injectable DataDescription dataDescription,
                                  @Mocked Catalog catalog,
-                                 @Injectable Database database,
-                                 @Injectable BrokerDesc brokerDesc,
-                                 @Injectable String originStmt) {
+                                 @Injectable Database database) {
         List<DataDescription> dataDescriptionList = Lists.newArrayList();
         dataDescriptionList.add(dataDescription);
 
-        String label = "label";
-        long dbId = 1;
         String tableName = "table";
         String databaseName = "database";
         new Expectations() {
@@ -105,7 +101,7 @@ public class BrokerLoadJobTest {
         };
 
         try {
-            BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, 
new OriginStatement(originStmt, 0));
+            BrokerLoadJob.fromLoadStmt(loadStmt);
             Assert.fail();
         } catch (DdlException e) {
             System.out.println("could not find table named " + tableName);
@@ -119,8 +115,7 @@ public class BrokerLoadJobTest {
                                  @Injectable LabelName labelName,
                                  @Injectable Database database,
                                  @Injectable OlapTable olapTable,
-                                 @Mocked Catalog catalog,
-                                 @Injectable String originStmt) {
+                                 @Mocked Catalog catalog) {
 
         String label = "label";
         long dbId = 1;
@@ -170,7 +165,7 @@ public class BrokerLoadJobTest {
         };
 
         try {
-            BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, 
new OriginStatement(originStmt, 0));
+            BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt);
             Assert.assertEquals(Long.valueOf(dbId), 
Deencapsulation.getField(brokerLoadJob, "dbId"));
             Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, 
"label"));
             Assert.assertEquals(JobState.PENDING, 
Deencapsulation.getField(brokerLoadJob, "state"));
diff --git 
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 5655187..8469aca 100644
--- 
a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -94,6 +94,7 @@ public class RoutineLoadManagerTest {
         CreateRoutineLoadStmt createRoutineLoadStmt = new 
CreateRoutineLoadStmt(labelName, tableNameString,
                                                                                
 loadPropertyList, properties,
                                                                                
 typeName, customProperties);
+        createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, 
jobName, "default_cluster", 1L, 1L,
                 serverAddress, topicName);
@@ -116,7 +117,7 @@ public class RoutineLoadManagerTest {
             }
         };
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
-        routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new 
OriginStatement("dummy", 0));
+        routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
 
         Map<String, RoutineLoadJob> idToRoutineLoadJob =
                 Deencapsulation.getField(routineLoadManager, 
"idToRoutineLoadJob");
@@ -162,6 +163,7 @@ public class RoutineLoadManagerTest {
         CreateRoutineLoadStmt createRoutineLoadStmt = new 
CreateRoutineLoadStmt(labelName, tableNameString,
                                                                                
 loadPropertyList, properties,
                                                                                
 typeName, customProperties);
+        createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
 
 
         new Expectations() {
@@ -176,7 +178,7 @@ public class RoutineLoadManagerTest {
         };
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         try {
-            routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new 
OriginStatement("dummy", 0));
+            routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
             Assert.fail();
         } catch (LoadException | DdlException e) {
             Assert.fail();
diff --git 
a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java 
b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java
similarity index 64%
copy from 
fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java
copy to fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java
index c09f2ad..f8c5d46 100644
--- a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java
+++ b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java
@@ -17,6 +17,19 @@
 
 package org.apache.doris.persist.gson;
 
-public interface GsonPostProcessable {
-    public void gsonPostProcess();
+import org.apache.doris.thrift.TStorageFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ThriftToJsonTest {
+
+    @Test
+    public void testTEnumToJson() {
+        // write
+        String serializeString = GsonUtils.GSON.toJson(TStorageFormat.V1);
+        // read
+        TStorageFormat tStorageFormat = 
GsonUtils.GSON.fromJson(serializeString, TStorageFormat.class);
+        Assert.assertEquals(TStorageFormat.V1, tStorageFormat);
+    }
 }
diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java 
b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 8484363..c2f701c 100644
--- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -577,7 +577,7 @@ public class StmtExecutorTest {
         new Expectations(ddlExecutor) {
             {
                 // Mock ddl
-                DdlExecutor.execute((Catalog) any, (DdlStmt) any, 
(OriginStatement) any);
+                DdlExecutor.execute((Catalog) any, (DdlStmt) any);
                 minTimes = 0;
             }
         };
@@ -610,7 +610,7 @@ public class StmtExecutorTest {
         new Expectations(ddlExecutor) {
             {
                 // Mock ddl
-                DdlExecutor.execute((Catalog) any, (DdlStmt) any, 
(OriginStatement) any);
+                DdlExecutor.execute((Catalog) any, (DdlStmt) any);
                 minTimes = 0;
                 result = new DdlException("ddl fail");
             }
@@ -644,7 +644,7 @@ public class StmtExecutorTest {
         new Expectations(ddlExecutor) {
             {
                 // Mock ddl
-                DdlExecutor.execute((Catalog) any, (DdlStmt) any, 
(OriginStatement) any);
+                DdlExecutor.execute((Catalog) any, (DdlStmt) any);
                 minTimes = 0;
                 result = new Exception("bug");
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to