This is an automated email from the ASF dual-hosted git repository. liuhangyuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 30df9fc Serialize origin stmt in Rollup Job and MV Meta (#3705) 30df9fc is described below commit 30df9fcae96419b6a9674792d6bef19d77d68dee Author: EmmyMiao87 <522274...@qq.com> AuthorDate: Sat May 30 20:17:46 2020 +0800 Serialize origin stmt in Rollup Job and MV Meta (#3705) * Serialize origin stmt in Rollup Job and MV Meta In materialized view 2.0, the define expr is serialized in column. The method is that doris serialzie the origin stmt of Create Materialzied View Stmt in RollupJobV2 and MVMeta. The define expr will be extract from the origin stmt after meta is deserialized. The define expr is necessary for bitmap and hll materialized view. For example: MV meta: __doris_mv_bitmap_k1, bitmap_union, to_bitmap(k1) Origin stmt: select bitmap_union(to_bitmap(k1)) from table Deserialize meta: __doris_mv_bitmap_k1, bitmap_union, null After extract: the define expr `to_bitmap(k1)` from origin stmt should be extracted. __doris_mv_bitmap_v1, bitmap_union, to_bitmap(k1) (which comes from the origin stmt) Change-Id: Ic2da093188d8985f5e97be5bd094e5d60d82c9a7 * Add comment of read method Change-Id: I4e1e0f4ad0f6e76cdc43e49938de768ec3b0a0e8 * Fix ut Change-Id: I2be257d512bf541f00912a374a2e07a039fc42b4 * Change code style Change-Id: I3ab23f5c94ae781167f498fefde2d96e42e05bf9 --- fe/src/main/java/org/apache/doris/alter/Alter.java | 6 +- .../java/org/apache/doris/alter/AlterJobV2.java | 36 +++- .../doris/alter/MaterializedViewHandler.java | 12 +- .../java/org/apache/doris/alter/RollupJobV2.java | 206 ++++++++++++--------- .../org/apache/doris/alter/SchemaChangeJobV2.java | 167 +++-------------- .../doris/analysis/CreateMaterializedViewStmt.java | 8 +- .../org/apache/doris/analysis/StatementBase.java | 15 +- .../java/org/apache/doris/catalog/Catalog.java | 3 +- .../main/java/org/apache/doris/catalog/Column.java | 40 ++-- .../doris/catalog/MaterializedIndexMeta.java | 48 ++++- .../java/org/apache/doris/catalog/OlapTable.java | 11 +- .../main/java/org/apache/doris/common/Config.java | 2 +- .../org/apache/doris/common/FeMetaVersion.java | 4 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +- .../org/apache/doris/load/loadv2/LoadManager.java | 5 +- .../doris/load/routineload/RoutineLoadManager.java | 5 +- .../doris/persist/gson/GsonPostProcessable.java | 4 +- .../org/apache/doris/persist/gson/GsonUtils.java | 12 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 3 +- .../main/java/org/apache/doris/qe/DdlExecutor.java | 7 +- .../java/org/apache/doris/qe/StmtExecutor.java | 10 +- .../org/apache/doris/alter/RollupJobV2Test.java | 50 ++++- .../apache/doris/alter/SchemaChangeJobV2Test.java | 39 ++++ .../java/org/apache/doris/catalog/ColumnTest.java | 3 +- .../doris/catalog/MaterializedIndexMetaTest.java | 53 +++++- .../apache/doris/catalog/TempPartitionTest.java | 4 - .../doris/load/loadv2/BrokerLoadJobTest.java | 13 +- .../load/routineload/RoutineLoadManagerTest.java | 6 +- .../doris/persist/gson/ThriftToJsonTest.java} | 17 +- .../java/org/apache/doris/qe/StmtExecutorTest.java | 6 +- 30 files changed, 458 insertions(+), 341 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index 4678d50..9711065 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -80,7 +80,8 @@ public class Alter { clusterHandler.start(); } - public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) throws DdlException, AnalysisException { + public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) + throws DdlException, AnalysisException { String tableName = stmt.getBaseIndexName(); // check db String dbName = stmt.getDBName(); @@ -102,7 +103,8 @@ public class Alter { OlapTable olapTable = (OlapTable) table; olapTable.checkStableAndNormal(db.getClusterName()); - ((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable); + ((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, + olapTable); } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java index 62c61cb..3a51472 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -22,10 +22,13 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.common.Config; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,17 +62,27 @@ public abstract class AlterJobV2 implements Writable { ROLLUP, SCHEMA_CHANGE } + @SerializedName(value = "type") protected JobType type; + @SerializedName(value = "jobId") protected long jobId; + @SerializedName(value = "jobState") protected JobState jobState; + @SerializedName(value = "dbId") protected long dbId; + @SerializedName(value = "tableId") protected long tableId; + @SerializedName(value = "tableName") protected String tableName; + @SerializedName(value = "errMsg") protected String errMsg = ""; + @SerializedName(value = "createTimeMs") protected long createTimeMs = -1; + @SerializedName(value = "finishedTimeMs") protected long finishedTimeMs = -1; + @SerializedName(value = "timeoutMs") protected long timeoutMs = -1; public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { @@ -220,15 +233,20 @@ public abstract class AlterJobV2 implements Writable { public abstract void replay(AlterJobV2 replayedJob); public static AlterJobV2 read(DataInput in) throws IOException { - JobType type = JobType.valueOf(Text.readString(in)); - switch (type) { - case ROLLUP: - return RollupJobV2.read(in); - case SCHEMA_CHANGE: - return SchemaChangeJobV2.read(in); - default: - Preconditions.checkState(false); - return null; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) { + JobType type = JobType.valueOf(Text.readString(in)); + switch (type) { + case ROLLUP: + return RollupJobV2.read(in); + case SCHEMA_CHANGE: + return SchemaChangeJobV2.read(in); + default: + Preconditions.checkState(false); + return null; + } + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, AlterJobV2.class); } } diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 921350e..1d3fb3a 100644 --- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -57,6 +57,7 @@ import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -89,7 +90,6 @@ import java.util.concurrent.ConcurrentHashMap; public class MaterializedViewHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class); public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_"; - public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_"; public MaterializedViewHandler() { super("materialized view"); @@ -200,7 +200,7 @@ public class MaterializedViewHandler extends AlterHandler { // Step2: create mv job RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns, addMVClause - .getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType()); + .getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType(), addMVClause.getOrigStmt()); addAlterJobV2(rollupJobV2); @@ -264,7 +264,7 @@ public class MaterializedViewHandler extends AlterHandler { // step 3 create rollup job RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, addRollupClause.getProperties(), - olapTable, db, baseIndexId, olapTable.getKeysType()); + olapTable, db, baseIndexId, olapTable.getKeysType(), null); rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2); logJobIdSet.add(alterJobV2.getJobId()); @@ -313,7 +313,7 @@ public class MaterializedViewHandler extends AlterHandler { */ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName, List<Column> mvColumns, Map<String, String> properties, OlapTable - olapTable, Database db, long baseIndexId, KeysType mvKeysType) + olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt) throws DdlException, AnalysisException { if (mvKeysType == null) { // assign rollup index's key type, same as base index's @@ -337,7 +337,7 @@ public class MaterializedViewHandler extends AlterHandler { RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs, baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, baseSchemaHash, mvSchemaHash, - mvKeysType, mvShortKeyColumnCount); + mvKeysType, mvShortKeyColumnCount, origStmt); String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName(); if (mvName.equals(newStorageFormatIndexName)) { mvJob.setStorageFormat(TStorageFormat.V2); @@ -465,10 +465,8 @@ public class MaterializedViewHandler extends AlterHandler { if (mvColumnItem.getDefineExpr() != null) { if (mvAggregationType.equals(AggregateType.BITMAP_UNION)) { newMVColumn.setType(Type.BITMAP); - newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + baseColumn.getName()); } else if (mvAggregationType.equals(AggregateType.HLL_UNION)){ newMVColumn.setType(Type.HLL); - newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "hll_" + baseColumn.getName()); } else { throw new DdlException("The define expr of column is only support bitmap_union or hll_union"); } diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index aac6d09..e856a76 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -17,6 +17,10 @@ package org.apache.doris.alter; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -36,7 +40,12 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -53,6 +62,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -60,6 +70,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringReader; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -70,30 +81,45 @@ import java.util.concurrent.TimeUnit; * This is for replacing the old RollupJob * https://github.com/apache/incubator-doris/issues/1429 */ -public class RollupJobV2 extends AlterJobV2 { +public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(RollupJobV2.class); // partition id -> (rollup tablet id -> base tablet id) + @SerializedName(value = "partitionIdToBaseRollupTabletIdMap") private Map<Long, Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap(); + @SerializedName(value = "partitionIdToRollupIndex") private Map<Long, MaterializedIndex> partitionIdToRollupIndex = Maps.newHashMap(); // rollup and base schema info + @SerializedName(value = "baseIndexId") private long baseIndexId; + @SerializedName(value = "rollupIndexId") private long rollupIndexId; + @SerializedName(value = "baseIndexName") private String baseIndexName; + @SerializedName(value = "rollupIndexName") private String rollupIndexName; + @SerializedName(value = "rollupSchema") private List<Column> rollupSchema = Lists.newArrayList(); + @SerializedName(value = "baseSchemaHash") private int baseSchemaHash; + @SerializedName(value = "rollupSchemaHash") private int rollupSchemaHash; + @SerializedName(value = "rollupKeysType") private KeysType rollupKeysType; + @SerializedName(value = "rollupShortKeyColumnCount") private short rollupShortKeyColumnCount; + @SerializedName(value = "origStmt") + private OriginStatement origStmt; // optional + @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; // The rollup job will wait all transactions before this txn id finished, then send the rollup tasks. + @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; // save all create rollup tasks @@ -101,8 +127,8 @@ public class RollupJobV2 extends AlterJobV2 { public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, - List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, - KeysType rollupKeysType, short rollupShortKeyColumnCount) { + List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, + short rollupShortKeyColumnCount, OriginStatement origStmt) { super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); this.baseIndexId = baseIndexId; @@ -115,6 +141,8 @@ public class RollupJobV2 extends AlterJobV2 { this.rollupSchemaHash = rollupSchemaHash; this.rollupKeysType = rollupKeysType; this.rollupShortKeyColumnCount = rollupShortKeyColumnCount; + + this.origStmt = origStmt; } private RollupJobV2() { @@ -278,7 +306,7 @@ public class RollupJobV2 extends AlterJobV2 { } tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* init schema version */, - rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType); + rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType, origStmt); } /** @@ -506,90 +534,6 @@ public class RollupJobV2 extends AlterJobV2 { return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } - public static RollupJobV2 read(DataInput in) throws IOException { - RollupJobV2 rollupJob = new RollupJobV2(); - rollupJob.readFields(in); - return rollupJob; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - out.writeInt(partitionIdToRollupIndex.size()); - for (long partitionId : partitionIdToRollupIndex.keySet()) { - out.writeLong(partitionId); - - out.writeInt(partitionIdToBaseRollupTabletIdMap.get(partitionId).size()); - for (Map.Entry<Long, Long> entry : partitionIdToBaseRollupTabletIdMap.get(partitionId).entrySet()) { - out.writeLong(entry.getKey()); - out.writeLong(entry.getValue()); - } - - MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId); - rollupIndex.write(out); - } - - out.writeLong(baseIndexId); - out.writeLong(rollupIndexId); - Text.writeString(out, baseIndexName); - Text.writeString(out, rollupIndexName); - - // rollup schema - out.writeInt(rollupSchema.size()); - for (Column column : rollupSchema) { - column.write(out); - } - out.writeInt(baseSchemaHash); - out.writeInt(rollupSchemaHash); - - Text.writeString(out, rollupKeysType.name()); - out.writeShort(rollupShortKeyColumnCount); - - out.writeLong(watershedTxnId); - Text.writeString(out, storageFormat.name()); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - - int size = in.readInt(); - for (int i = 0; i < size; i++) { - long partitionId = in.readLong(); - int size2 = in.readInt(); - Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap()); - for (int j = 0; j < size2; j++) { - long rollupTabletId = in.readLong(); - long baseTabletId = in.readLong(); - tabletIdMap.put(rollupTabletId, baseTabletId); - } - - partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in)); - } - - baseIndexId = in.readLong(); - rollupIndexId = in.readLong(); - baseIndexName = Text.readString(in); - rollupIndexName = Text.readString(in); - - size = in.readInt(); - for (int i = 0; i < size; i++) { - Column column = Column.read(in); - rollupSchema.add(column); - } - baseSchemaHash = in.readInt(); - rollupSchemaHash = in.readInt(); - - rollupKeysType = KeysType.valueOf(Text.readString(in)); - rollupShortKeyColumnCount = in.readShort(); - - watershedTxnId = in.readLong(); - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { - storageFormat = TStorageFormat.valueOf(Text.readString(in)); - } - } - /** * Replay job in PENDING state. * Should replay all changes before this job's state transfer to PENDING. @@ -773,4 +717,90 @@ public class RollupJobV2 extends AlterJobV2 { this.jobState = jobState; } + private void setColumnsDefineExpr(List<MVColumnItem> items) { + for (MVColumnItem item : items) { + for (Column column : rollupSchema) { + if (column.getName().equals(item.getName())) { + column.setDefineExpr(item.getDefineExpr()); + break; + } + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); + Text.writeString(out, json); + } + + /** + * This method is only used to deserialize the text mate which version is less then 86. + * If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here. + */ + public static RollupJobV2 read(DataInput in) throws IOException { + Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86); + RollupJobV2 rollupJob = new RollupJobV2(); + rollupJob.readFields(in); + return rollupJob; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int size = in.readInt(); + for (int i = 0; i < size; i++) { + long partitionId = in.readLong(); + int size2 = in.readInt(); + Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap()); + for (int j = 0; j < size2; j++) { + long rollupTabletId = in.readLong(); + long baseTabletId = in.readLong(); + tabletIdMap.put(rollupTabletId, baseTabletId); + } + + partitionIdToRollupIndex.put(partitionId, MaterializedIndex.read(in)); + } + + baseIndexId = in.readLong(); + rollupIndexId = in.readLong(); + baseIndexName = Text.readString(in); + rollupIndexName = Text.readString(in); + + size = in.readInt(); + for (int i = 0; i < size; i++) { + Column column = Column.read(in); + rollupSchema.add(column); + } + baseSchemaHash = in.readInt(); + rollupSchemaHash = in.readInt(); + + rollupKeysType = KeysType.valueOf(Text.readString(in)); + rollupShortKeyColumnCount = in.readShort(); + + watershedTxnId = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_85) { + storageFormat = TStorageFormat.valueOf(Text.readString(in)); + } + } + + @Override + public void gsonPostProcess() throws IOException { + // analyze define stmt + if (origStmt == null) { + return; + } + // parse the define stmt to schema + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), + SqlModeHelper.MODE_DEFAULT)); + CreateMaterializedViewStmt stmt; + try { + stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, origStmt.idx); + stmt.analyzeSelectClause(); + setColumnsDefineExpr(stmt.getMVColumnItemList()); + } catch (Exception e) { + throw new IOException("error happens when parsing create materialized view stmt: " + origStmt, e); + } + } } diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 71236ec..08838eb 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -38,6 +38,7 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -57,8 +58,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.collect.Table.Cell; +import com.google.gson.annotations.SerializedName; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,35 +82,44 @@ public class SchemaChangeJobV2 extends AlterJobV2 { private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class); // partition id -> (shadow index id -> (shadow tablet id -> origin tablet id)) + @SerializedName(value = "partitionIndexTabletMap") private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = HashBasedTable.create(); // partition id -> (shadow index id -> shadow index)) private Table<Long, Long, MaterializedIndex> partitionIndexMap = HashBasedTable.create(); // shadow index id -> origin index id + @SerializedName(value = "indexIdMap") private Map<Long, Long> indexIdMap = Maps.newHashMap(); // shadow index id -> shadow index name(__doris_shadow_xxx) + @SerializedName(value = "indexIdToName") private Map<Long, String> indexIdToName = Maps.newHashMap(); // shadow index id -> index schema + @SerializedName(value = "indexSchemaMap") private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap(); // shadow index id -> (shadow index schema version : schema hash) + @SerializedName(value = "indexSchemaVersionAndHashMap") private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap = Maps.newHashMap(); // shadow index id -> shadow index short key count + @SerializedName(value = "indexShortKeyMap") private Map<Long, Short> indexShortKeyMap = Maps.newHashMap(); - // identify whether the job is finished and no need to persist some data - private boolean isMetaPruned = false; - // bloom filter info + @SerializedName(value = "hasBfChange") private boolean hasBfChange; + @SerializedName(value = "bfColumns") private Set<String> bfColumns = null; + @SerializedName(value = "bfFpp") private double bfFpp = 0; // alter index info + @SerializedName(value = "indexChange") private boolean indexChange = false; + @SerializedName(value = "indexes") private List<Index> indexes = null; // The schema change job will wait all transactions before this txn id finished, then send the schema change tasks. + @SerializedName(value = "watershedTxnId") protected long watershedTxnId = -1; - + @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; // save all schema change tasks @@ -170,7 +180,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { partitionIndexMap.clear(); indexSchemaMap.clear(); indexShortKeyMap.clear(); - isMetaPruned = true; } /** @@ -642,12 +651,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId)); } - public static SchemaChangeJobV2 read(DataInput in) throws IOException { - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(); - schemaChangeJob.readFields(in); - return schemaChangeJob; - } - /** * Replay job in PENDING state. * Should replay all changes before this job's state transfer to PENDING. @@ -827,80 +830,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } /** - * write data need to persist when job not finish - */ - private void writeJobNotFinishData(DataOutput out) throws IOException { - out.writeInt(partitionIndexTabletMap.rowKeySet().size()); - for (Long partitionId : partitionIndexTabletMap.rowKeySet()) { - out.writeLong(partitionId); - Map<Long, Map<Long, Long>> indexTabletMap = partitionIndexTabletMap.row(partitionId); - out.writeInt(indexTabletMap.size()); - for (Long shadowIndexId : indexTabletMap.keySet()) { - out.writeLong(shadowIndexId); - // tablet id map - Map<Long, Long> tabletMap = indexTabletMap.get(shadowIndexId); - out.writeInt(tabletMap.size()); - for (Map.Entry<Long, Long> entry : tabletMap.entrySet()) { - out.writeLong(entry.getKey()); - out.writeLong(entry.getValue()); - } - // shadow index - MaterializedIndex shadowIndex = partitionIndexMap.get(partitionId, shadowIndexId); - shadowIndex.write(out); - } - } - - // shadow index info - out.writeInt(indexIdMap.size()); - for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) { - long shadowIndexId = entry.getKey(); - out.writeLong(shadowIndexId); - // index id map - out.writeLong(entry.getValue()); - // index name - Text.writeString(out, indexIdToName.get(shadowIndexId)); - // index schema - out.writeInt(indexSchemaMap.get(shadowIndexId).size()); - for (Column column : indexSchemaMap.get(shadowIndexId)) { - column.write(out); - } - // index schema version and hash - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first); - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second); - // short key count - out.writeShort(indexShortKeyMap.get(shadowIndexId)); - } - - // bloom filter - out.writeBoolean(hasBfChange); - if (hasBfChange) { - out.writeInt(bfColumns.size()); - for (String bfCol : bfColumns) { - Text.writeString(out, bfCol); - } - out.writeDouble(bfFpp); - } - - out.writeLong(watershedTxnId); - - // index - out.writeBoolean(indexChange); - if (indexChange) { - if (CollectionUtils.isNotEmpty(indexes)) { - out.writeBoolean(true); - out.writeInt(indexes.size()); - for (Index index : indexes) { - index.write(out); - } - } else { - out.writeBoolean(false); - } - } - - Text.writeString(out, storageFormat.name()); - } - - /** * read data need to persist when job not finish */ private void readJobNotFinishData(DataInput in) throws IOException { @@ -983,53 +912,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } /** - * write data need to persist when job finished - */ - private void writeJobFinishedData(DataOutput out) throws IOException { - // only persist data will be used in getInfo - out.writeInt(indexIdMap.size()); - for (Entry<Long, Long> entry : indexIdMap.entrySet()) { - long shadowIndexId = entry.getKey(); - out.writeLong(shadowIndexId); - // index id map - out.writeLong(entry.getValue()); - // index name - Text.writeString(out, indexIdToName.get(shadowIndexId)); - // index schema version and hash - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first); - out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second); - } - - // bloom filter - out.writeBoolean(hasBfChange); - if (hasBfChange) { - out.writeInt(bfColumns.size()); - for (String bfCol : bfColumns) { - Text.writeString(out, bfCol); - } - out.writeDouble(bfFpp); - } - - out.writeLong(watershedTxnId); - - // index - out.writeBoolean(indexChange); - if (indexChange) { - if (CollectionUtils.isNotEmpty(indexes)) { - out.writeBoolean(true); - out.writeInt(indexes.size()); - for (Index index : indexes) { - index.write(out); - } - } else { - out.writeBoolean(false); - } - } - - Text.writeString(out, storageFormat.name()); - } - - /** * read data need to persist when job finished */ private void readJobFinishedData(DataInput in) throws IOException { @@ -1082,14 +964,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 { @Override public void write(DataOutput out) throws IOException { - super.write(out); + String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); + Text.writeString(out, json); + } - out.writeBoolean(isMetaPruned); - if (isMetaPruned) { - writeJobFinishedData(out); - } else { - writeJobNotFinishData(out); - } + /** + * This method is only used to deserialize the text mate which version is less then 86. + * If the meta version >=86, it will be deserialized by the `read` of AlterJobV2 rather then here. + */ + public static SchemaChangeJobV2 read(DataInput in) throws IOException { + Preconditions.checkState(Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86); + SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(); + schemaChangeJob.readFields(in); + return schemaChangeJob; } @Override diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java index 5690be3..56ec28d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java @@ -49,6 +49,8 @@ import java.util.Set; * [PROPERTIES ("key" = "value")] */ public class CreateMaterializedViewStmt extends DdlStmt { + public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_"; + private String mvName; private SelectStmt selectStmt; private Map<String, String> properties; @@ -100,9 +102,8 @@ public class CreateMaterializedViewStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws UserException { - // TODO(ml): remove it if (!Config.enable_materialized_view) { - throw new AnalysisException("The materialized view is coming soon"); + throw new AnalysisException("The materialized view is disabled"); } super.analyze(analyzer); FeNameFormat.checkTableName(mvName); @@ -128,7 +129,7 @@ public class CreateMaterializedViewStmt extends DdlStmt { } } - private void analyzeSelectClause() throws AnalysisException { + public void analyzeSelectClause() throws AnalysisException { SelectList selectList = selectStmt.getSelectList(); if (selectList.getItems().isEmpty()) { throw new AnalysisException("The materialized view must contain at least one column"); @@ -200,6 +201,7 @@ public class CreateMaterializedViewStmt extends DdlStmt { beginIndexOfAggregation = i; } // TODO(ml): support different type of column, int -> bigint(sum) + // TODO: change the column name of bitmap and hll MVColumnItem mvColumnItem = new MVColumnItem(columnName); mvColumnItem.setAggregationType(AggregateType.valueOf(functionName.toUpperCase()), false); mvColumnItem.setDefineExpr(defineExpr); diff --git a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java index 7410028..3fd62aa 100644 --- a/fe/src/main/java/org/apache/doris/analysis/StatementBase.java +++ b/fe/src/main/java/org/apache/doris/analysis/StatementBase.java @@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.rewrite.ExprRewriter; import com.google.common.base.Preconditions; @@ -46,6 +47,8 @@ public abstract class StatementBase implements ParseNode { // END: Members that need to be reset() ///////////////////////////////////////// + private OriginStatement origStmt; + protected StatementBase() { } /** @@ -153,7 +156,17 @@ public abstract class StatementBase implements ParseNode { public void setClusterName(String clusterName) { this.clusterName = clusterName; - } + } + + public void setOrigStmt(OriginStatement origStmt) { + Preconditions.checkState(origStmt != null); + this.origStmt = origStmt; + } + + public OriginStatement getOrigStmt() { + return origStmt; + } + /** * Resets the internal analysis state of this node. * For easier maintenance, class members that need to be reset are grouped into diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 4c44409..cf95e6d 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4903,7 +4903,8 @@ public class Catalog { this.alter.processAlterView(stmt, ConnectContext.get()); } - public void createMaterializedView(CreateMaterializedViewStmt stmt) throws AnalysisException, DdlException { + public void createMaterializedView(CreateMaterializedViewStmt stmt) + throws AnalysisException, DdlException { this.alter.processCreateMaterializedView(stmt); } diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java b/fe/src/main/java/org/apache/doris/catalog/Column.java index d1ce8a6..461cf6a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/src/main/java/org/apache/doris/catalog/Column.java @@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TColumnType; @@ -67,7 +68,7 @@ public class Column implements Writable { private String comment; @SerializedName(value = "stats") private ColumnStats stats; // cardinality and selectivity etc. - private Expr defineExpr; //use to define materialize view + private Expr defineExpr; // use to define column in materialize view public Column() { this.name = ""; @@ -411,31 +412,11 @@ public class Column implements Writable { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, name); - ColumnType.write(out, type); - if (null == aggregationType) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, aggregationType.name()); - out.writeBoolean(isAggregationTypeImplicit); - } - - out.writeBoolean(isKey); - out.writeBoolean(isAllowNull); - - if (defaultValue == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, defaultValue); - } - stats.write(out); - - Text.writeString(out, comment); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + private void readFields(DataInput in) throws IOException { name = Text.readString(in); type = ColumnType.read(in); boolean notNull = in.readBoolean(); @@ -475,8 +456,13 @@ public class Column implements Writable { } public static Column read(DataInput in) throws IOException { - Column column = new Column(); - column.readFields(in); - return column; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) { + Column column = new Column(); + column.readFields(in); + return column; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Column.class); + } } } diff --git a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java index 25a8bb5..2e224f9 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java +++ b/fe/src/main/java/org/apache/doris/catalog/MaterializedIndexMeta.java @@ -17,9 +17,17 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.thrift.TStorageType; import com.google.common.base.Preconditions; @@ -29,9 +37,10 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringReader; import java.util.List; -public class MaterializedIndexMeta implements Writable { +public class MaterializedIndexMeta implements Writable, GsonPostProcessable { @SerializedName(value = "indexId") private long indexId; @SerializedName(value = "schema") @@ -46,9 +55,11 @@ public class MaterializedIndexMeta implements Writable { private TStorageType storageType; @SerializedName(value = "keysType") private KeysType keysType; + @SerializedName(value = "defineStmt") + private OriginStatement defineStmt; - public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int - schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) { + public MaterializedIndexMeta(long indexId, List<Column> schema, int schemaVersion, int schemaHash, + short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement defineStmt) { this.indexId = indexId; Preconditions.checkState(schema != null); Preconditions.checkState(schema.size() != 0); @@ -60,6 +71,7 @@ public class MaterializedIndexMeta implements Writable { this.storageType = storageType; Preconditions.checkState(keysType != null); this.keysType = keysType; + this.defineStmt = defineStmt; } public long getIndexId() { @@ -94,6 +106,17 @@ public class MaterializedIndexMeta implements Writable { return schemaVersion; } + private void setColumnsDefineExpr(List<MVColumnItem> items) { + for (MVColumnItem item : items) { + for (Column column : schema) { + if (column.getName().equals(item.getName())) { + column.setDefineExpr(item.getDefineExpr()); + break; + } + } + } + } + @Override public boolean equals(Object obj) { if (!(obj instanceof MaterializedIndexMeta)) { @@ -134,4 +157,23 @@ public class MaterializedIndexMeta implements Writable { return GsonUtils.GSON.fromJson(json, MaterializedIndexMeta.class); } + @Override + public void gsonPostProcess() throws IOException { + // analyze define stmt + if (defineStmt == null) { + return; + } + // parse the define stmt to schema + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(defineStmt.originStmt), + SqlModeHelper.MODE_DEFAULT)); + CreateMaterializedViewStmt stmt; + try { + stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, defineStmt.idx); + stmt.analyzeSelectClause(); + setColumnsDefineExpr(stmt.getMVColumnItemList()); + } catch (Exception e) { + throw new IOException("error happens when parsing create materialized view stmt: " + defineStmt, e); + } + } + } diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index f307a5f..b4d60f6 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -38,6 +38,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TOlapTable; import org.apache.doris.thrift.TStorageFormat; @@ -246,6 +247,12 @@ public class OlapTable extends Table { public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash, short shortKeyColumnCount, TStorageType storageType, KeysType keysType) { + setIndexMeta(indexId, indexName, schema, schemaVersion, schemaHash, shortKeyColumnCount, storageType, keysType, + null); + } + + public void setIndexMeta(long indexId, String indexName, List<Column> schema, int schemaVersion, int schemaHash, + short shortKeyColumnCount, TStorageType storageType, KeysType keysType, OriginStatement origStmt) { // Nullable when meta comes from schema change log replay. // The replay log only save the index id, so we need to get name by id. if (indexName == null) { @@ -268,7 +275,7 @@ public class OlapTable extends Table { } MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, schemaVersion, - schemaHash, shortKeyColumnCount, storageType, keysType); + schemaHash, shortKeyColumnCount, storageType, keysType, origStmt); indexIdToMeta.put(indexId, indexMeta); indexNameToId.put(indexName, indexId); } @@ -970,7 +977,7 @@ public class OlapTable extends Table { // The keys type in here is incorrect MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(indexId, schema, - schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS); + schemaVersion, schemaHash, shortKeyColumnCount, storageType, KeysType.AGG_KEYS, null); tmpIndexMetaList.add(indexMeta); } else { MaterializedIndexMeta indexMeta = MaterializedIndexMeta.read(in); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 5cdf4f6..c29921f 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -1060,7 +1060,7 @@ public class Config extends ConfigBase { * control materialized view */ @ConfField(mutable = true, masterOnly = true) - public static boolean enable_materialized_view = false; + public static boolean enable_materialized_view = true; /** * it can't auto-resume routine load job as long as one of the backends is down diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index f4d1f8c..dabd7a8 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -181,6 +181,8 @@ public final class FeMetaVersion { public static final int VERSION_84 = 84; // add storage format in rollup job public static final int VERSION_85 = 85; + // serialize origStmt in rollupJob and mv meta + public static final int VERSION_86 = 86; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_85; + public static final int VERSION_CURRENT = VERSION_86; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index c4d7d2c..8d817f0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -124,7 +124,7 @@ public class BrokerLoadJob extends LoadJob { } } - public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException { + public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { // get db id String dbName = stmt.getLabel().getDbName(); Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName()); @@ -135,7 +135,7 @@ public class BrokerLoadJob extends LoadJob { // create job try { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getBrokerDesc(), originStmt); + stmt.getBrokerDesc(), stmt.getOrigStmt()); brokerLoadJob.setJobProperties(stmt.getProperties()); brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); return brokerLoadJob; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index e123e41..e0cb91a 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -39,7 +39,6 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; import org.apache.doris.thrift.TMiniLoadRequest; @@ -98,7 +97,7 @@ public class LoadManager implements Writable{ * @param stmt * @throws DdlException */ - public void createLoadJobFromStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException { + public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); long dbId = database.getId(); LoadJob loadJob = null; @@ -112,7 +111,7 @@ public class LoadManager implements Writable{ throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " + "please retry later."); } - loadJob = BrokerLoadJob.fromLoadStmt(stmt, originStmt); + loadJob = BrokerLoadJob.fromLoadStmt(stmt); createLoadJob(loadJob); } finally { writeUnlock(); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 0efb6fe..061d7fb 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -37,7 +37,6 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.OriginStatement; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -117,7 +116,7 @@ public class RoutineLoadManager implements Writable { } - public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, OriginStatement origStmt) + public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException { // check load auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), @@ -141,7 +140,7 @@ public class RoutineLoadManager implements Writable { throw new UserException("Unknown data source type: " + type); } - routineLoadJob.setOrigStmt(origStmt); + routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt()); addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName()); } diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java index c09f2ad..6733d79 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java @@ -17,6 +17,8 @@ package org.apache.doris.persist.gson; +import java.io.IOException; + public interface GsonPostProcessable { - public void gsonPostProcess(); + public void gsonPostProcess() throws IOException; } diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 033ab03..10e1ad7 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -17,6 +17,9 @@ package org.apache.doris.persist.gson; +import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.alter.RollupJobV2; +import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.HashDistributionInfo; @@ -93,6 +96,12 @@ public class GsonUtils { .of(Resource.class, "clazz") .registerSubtype(SparkResource.class, SparkResource.class.getSimpleName()); + // runtime adapter for class "AlterJobV2" + private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory + .of(AlterJobV2.class, "clazz") + .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName()) + .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder() @@ -103,7 +112,8 @@ public class GsonUtils { .registerTypeAdapterFactory(new PostProcessTypeAdapterFactory()) .registerTypeAdapterFactory(columnTypeAdapterFactory) .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) - .registerTypeAdapterFactory(resourceTypeAdapterFactory); + .registerTypeAdapterFactory(resourceTypeAdapterFactory) + .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index d94733a..83ef187 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -178,7 +178,8 @@ public class ConnectProcessor { ctx.resetRetureRows(); } parsedStmt = stmts.get(i); - executor = new StmtExecutor(ctx, parsedStmt, new OriginStatement(originStmt, i)); + parsedStmt.setOrigStmt(new OriginStatement(originStmt, i)); + executor = new StmtExecutor(ctx, parsedStmt); executor.execute(); if (i != stmts.size() - 1) { diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index 284daf6..db1a0e5 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -82,8 +82,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.Load; public class DdlExecutor { - public static void execute(Catalog catalog, DdlStmt ddlStmt, OriginStatement origStmt) - throws DdlException, QueryStateException, Exception { + public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { if (ddlStmt instanceof CreateClusterStmt) { CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt; catalog.createCluster(stmt); @@ -132,7 +131,7 @@ public class DdlExecutor { if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) { catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); } else { - catalog.getLoadManager().createLoadJobFromStmt(loadStmt, origStmt); + catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { if (catalog.getLoadInstance().isLabelExist( @@ -142,7 +141,7 @@ public class DdlExecutor { catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { - catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt, origStmt); + catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof PauseRoutineLoadStmt) { catalog.getRoutineLoadManager().pauseRoutineLoadJob((PauseRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof ResumeRoutineLoadStmt) { diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index b5ccb65..114f9cf 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -128,12 +128,12 @@ public class StmtExecutor { } // constructor for receiving parsed stmt from connect processor - public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt, OriginStatement originStmt) { + public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) { this.context = ctx; - this.originStmt = originStmt; + this.parsedStmt = parsedStmt; + this.originStmt = parsedStmt.getOrigStmt(); this.serializer = context.getSerializer(); this.isProxy = false; - this.parsedStmt = parsedStmt; } // At the end of query execution, we begin to add up profile @@ -376,7 +376,7 @@ public class StmtExecutor { SqlParser parser = new SqlParser(input); try { parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx); - + parsedStmt.setOrigStmt(originStmt); } catch (Error e) { LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e); throw new AnalysisException("sql parsing error, please check your sql"); @@ -889,7 +889,7 @@ public class StmtExecutor { private void handleDdlStmt() { try { - DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt, originStmt); + DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt); context.getState().setOk(); } catch (QueryStateException e) { context.setState(e.getQueryState()); diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 35429d0..e3fd856 100644 --- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -24,6 +24,13 @@ import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.AddRollupClause; import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; @@ -34,7 +41,6 @@ import org.apache.doris.catalog.FakeEditLog; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; -import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; @@ -48,6 +54,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.thrift.TStorageFormat; @@ -60,7 +67,6 @@ import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.DataInputStream; @@ -367,15 +373,21 @@ public class RollupJobV2Test { @Test - public void testSerializeOfRollupJob() throws IOException { + public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException { // prepare file File file = new File(fileName); file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); short keysCount = 1; - RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup",Lists.newArrayList(), 1, 1, - KeysType.AGG_KEYS, keysCount); + List<Column> columns = Lists.newArrayList(); + String mvColumnName =CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "c1"; + Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""); + columns.add(column); + RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1, + KeysType.AGG_KEYS, keysCount, + new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test", + 0)); rollupJobV2.setStorageFormat(TStorageFormat.V2); // write rollup job @@ -383,15 +395,37 @@ public class RollupJobV2Test { out.flush(); out.close(); + List<MVColumnItem> itemList = Lists.newArrayList(); + MVColumnItem item = new MVColumnItem( + mvColumnName); + List<Expr> params = Lists.newArrayList(); + SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1"); + params.add(param1); + item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params)); + itemList.add(item); + new Expectations() { + { + stmt.getMVColumnItemList(); + result = itemList; + } + }; + // read objects from file MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_85); + metaContext.setMetaVersion(FeMetaVersion.VERSION_86); metaContext.setThreadLocalInfo(); - DataInputStream in = new DataInputStream(new FileInputStream(file)); + DataInputStream in = new DataInputStream(new FileInputStream(file)); RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in); - Catalog.getCurrentCatalogJournalVersion(); Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + List<Column> resultColumns = Deencapsulation.getField(result, "rollupSchema"); + Assert.assertEquals(1, resultColumns.size()); + Column resultColumn1 = resultColumns.get(0); + Assert.assertEquals(mvColumnName, + resultColumn1.getName()); + Assert.assertTrue(resultColumn1.getDefineExpr() instanceof FunctionCallExpr); + FunctionCallExpr resultFunctionCall = (FunctionCallExpr) resultColumn1.getDefineExpr(); + Assert.assertEquals("to_bitmap", resultFunctionCall.getFnName().getFunction()); } } diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 91a745b..8697028 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -53,9 +53,11 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; @@ -66,6 +68,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashMap; @@ -74,6 +82,8 @@ import java.util.Map; public class SchemaChangeJobV2Test { + private static String fileName = "./SchemaChangeV2Test"; + private static FakeEditLog fakeEditLog; private static FakeCatalog fakeCatalog; private static FakeTransactionIDGenerator fakeTransactionIDGenerator; @@ -372,4 +382,33 @@ public class SchemaChangeJobV2Test { modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE); modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE); } + + @Test + public void testSerializeOfSchemaChangeJob() throws IOException { + // prepare file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, "test",600000); + schemaChangeJobV2.setStorageFormat(TStorageFormat.V2); + Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED); + + + // write schema change job + schemaChangeJobV2.write(out); + out.flush(); + out.close(); + + // read objects from file + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_86); + metaContext.setThreadLocalInfo(); + + DataInputStream in = new DataInputStream(new FileInputStream(file)); + SchemaChangeJobV2 result = (SchemaChangeJobV2) AlterJobV2.read(in); + Assert.assertEquals(1, result.getJobId()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, result.getJobState()); + Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + } } diff --git a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java index 6f31055..04a46f9 100644 --- a/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/ColumnTest.java @@ -74,8 +74,7 @@ public class ColumnTest { // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - Column rColumn1 = new Column(); - rColumn1.readFields(dis); + Column rColumn1 = Column.read(dis); Assert.assertEquals("user", rColumn1.getName()); Assert.assertEquals(PrimitiveType.CHAR, rColumn1.getDataType()); Assert.assertEquals(AggregateType.SUM, rColumn1.getAggregationType()); diff --git a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java index 9d8af1a..f661e62 100644 --- a/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/MaterializedIndexMetaTest.java @@ -17,6 +17,14 @@ package org.apache.doris.catalog; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.FunctionName; +import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TStorageType; import com.google.common.collect.Lists; @@ -33,6 +41,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.List; +import mockit.Expectations; +import mockit.Mocked; + public class MaterializedIndexMetaTest { private static String fileName = "./MaterializedIndexMetaSerializeTest"; @@ -44,12 +55,13 @@ public class MaterializedIndexMetaTest { } @Test - public void testSerializeMaterializedIndexMeta() throws IOException { + public void testSerializeMaterializedIndexMeta(@Mocked CreateMaterializedViewStmt stmt) throws IOException { // 1. Write objects to file File file = new File(fileName); file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + "k1"; List<Column> schema = Lists.newArrayList(); schema.add(new Column("k1", Type.TINYINT, true, null, true, "1", "abc")); schema.add(new Column("k2", Type.SMALLINT, true, null, true, "1", "debug")); @@ -63,19 +75,48 @@ public class MaterializedIndexMetaTest { schema.add(new Column("k10", Type.VARCHAR, true, null, true, "1", "")); schema.add(new Column("k11", Type.DECIMALV2, true, null, true, "1", "")); schema.add(new Column("k12", Type.INT, true, null, true, "1", "")); - schema.add(new Column("v1", Type.INT, true, AggregateType.SUM, true, "1", "")); - schema.add(new Column("v1", Type.VARCHAR, true, AggregateType.REPLACE, true, "1", "")); + schema.add(new Column("v1", Type.INT, false, AggregateType.SUM, true, "1", "")); + schema.add(new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "")); short shortKeyColumnCount = 1; MaterializedIndexMeta indexMeta = new MaterializedIndexMeta(1, schema, 1, 1, shortKeyColumnCount, - TStorageType.COLUMN, KeysType.DUP_KEYS); + TStorageType.COLUMN, KeysType.DUP_KEYS, new OriginStatement( + "create materialized view test as select k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, sum(v1), " + + "bitmap_union(to_bitmap(k1)) from test group by k1, k2, k3, k4, k5, " + + "k6, k7, k8, k9, k10, k11, k12", + 0)); indexMeta.write(out); out.flush(); out.close(); + List<MVColumnItem> itemList = Lists.newArrayList(); + MVColumnItem item = new MVColumnItem(mvColumnName); + List<Expr> params = Lists.newArrayList(); + SlotRef param1 = new SlotRef(new TableName(null, "test"), "c1"); + params.add(param1); + item.setDefineExpr(new FunctionCallExpr(new FunctionName("to_bitmap"), params)); + itemList.add(item); + new Expectations() { + { + stmt.getMVColumnItemList(); + result = itemList; + } + }; + + // 2. Read objects from file DataInputStream in = new DataInputStream(new FileInputStream(file)); - MaterializedIndexMeta readIndexMeta = MaterializedIndexMeta.read(in); - Assert.assertEquals(indexMeta, readIndexMeta); + Assert.assertEquals(1, readIndexMeta.getIndexId()); + List<Column> resultColumns = readIndexMeta.getSchema(); + for (Column column : resultColumns) { + if (column.getName().equals(mvColumnName)) { + Assert.assertTrue(column.getDefineExpr() instanceof FunctionCallExpr); + Assert.assertEquals(Type.BITMAP, column.getType()); + Assert.assertEquals(AggregateType.BITMAP_UNION, column.getAggregationType()); + Assert.assertEquals("to_bitmap", ((FunctionCallExpr) column.getDefineExpr()).getFnName().getFunction()); + } else { + Assert.assertEquals(null, column.getDefineExpr()); + } + } } } diff --git a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java index 849848d..b39b3c0 100644 --- a/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/TempPartitionTest.java @@ -560,10 +560,6 @@ public class TempPartitionTest { } private void testSerializeOlapTable(OlapTable tbl) throws IOException, AnalysisException { - MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_77); - metaContext.setThreadLocalInfo(); - // 1. Write objects to file File file = new File(tempPartitionFile); file.createNewFile(); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 9c7192a..7991343 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -71,14 +71,10 @@ public class BrokerLoadJobTest { @Injectable LabelName labelName, @Injectable DataDescription dataDescription, @Mocked Catalog catalog, - @Injectable Database database, - @Injectable BrokerDesc brokerDesc, - @Injectable String originStmt) { + @Injectable Database database) { List<DataDescription> dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); - String label = "label"; - long dbId = 1; String tableName = "table"; String databaseName = "database"; new Expectations() { @@ -105,7 +101,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0)); + BrokerLoadJob.fromLoadStmt(loadStmt); Assert.fail(); } catch (DdlException e) { System.out.println("could not find table named " + tableName); @@ -119,8 +115,7 @@ public class BrokerLoadJobTest { @Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable, - @Mocked Catalog catalog, - @Injectable String originStmt) { + @Mocked Catalog catalog) { String label = "label"; long dbId = 1; @@ -170,7 +165,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0)); + BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt); Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId")); Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label")); Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state")); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 5655187..8469aca 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -94,6 +94,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); + createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, serverAddress, topicName); @@ -116,7 +117,7 @@ public class RoutineLoadManagerTest { } }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0)); + routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); Map<String, RoutineLoadJob> idToRoutineLoadJob = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); @@ -162,6 +163,7 @@ public class RoutineLoadManagerTest { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties); + createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); new Expectations() { @@ -176,7 +178,7 @@ public class RoutineLoadManagerTest { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); try { - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0)); + routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); Assert.fail(); } catch (LoadException | DdlException e) { Assert.fail(); diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java similarity index 64% copy from fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java copy to fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java index c09f2ad..f8c5d46 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonPostProcessable.java +++ b/fe/src/test/java/org/apache/doris/persist/gson/ThriftToJsonTest.java @@ -17,6 +17,19 @@ package org.apache.doris.persist.gson; -public interface GsonPostProcessable { - public void gsonPostProcess(); +import org.apache.doris.thrift.TStorageFormat; + +import org.junit.Assert; +import org.junit.Test; + +public class ThriftToJsonTest { + + @Test + public void testTEnumToJson() { + // write + String serializeString = GsonUtils.GSON.toJson(TStorageFormat.V1); + // read + TStorageFormat tStorageFormat = GsonUtils.GSON.fromJson(serializeString, TStorageFormat.class); + Assert.assertEquals(TStorageFormat.V1, tStorageFormat); + } } diff --git a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 8484363..c2f701c 100644 --- a/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -577,7 +577,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; } }; @@ -610,7 +610,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; result = new DdlException("ddl fail"); } @@ -644,7 +644,7 @@ public class StmtExecutorTest { new Expectations(ddlExecutor) { { // Mock ddl - DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any); + DdlExecutor.execute((Catalog) any, (DdlStmt) any); minTimes = 0; result = new Exception("bug"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org