This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new aacb9b9b66 [Enhancement](binlog) Add create/drop table, add/drop paritition && alter job, modify columns binlog support (#21544) aacb9b9b66 is described below commit aacb9b9b668fc51b4090fbf87d98bd92d9f209e9 Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Sun Jul 9 09:11:56 2023 +0800 [Enhancement](binlog) Add create/drop table, add/drop paritition && alter job, modify columns binlog support (#21544) --- .../main/java/org/apache/doris/alter/Alter.java | 2 +- .../java/org/apache/doris/alter/AlterHandler.java | 11 ++- .../java/org/apache/doris/alter/AlterJobV2.java | 10 +- .../doris/alter/MaterializedViewHandler.java | 26 +++--- .../java/org/apache/doris/alter/RollupJobV2.java | 19 ++-- .../apache/doris/alter/SchemaChangeHandler.java | 31 ++++--- .../org/apache/doris/alter/SchemaChangeJobV2.java | 9 +- .../java/org/apache/doris/alter/SystemHandler.java | 5 +- .../apache/doris/analysis/AbstractBackupStmt.java | 8 ++ .../java/org/apache/doris/analysis/IndexDef.java | 26 +++--- .../org/apache/doris/analysis/RestoreStmt.java | 4 + .../org/apache/doris/binlog/BinlogManager.java | 52 ++++++++++- .../org/apache/doris/binlog/CreateTableRecord.java | 102 +++++++++++++++++++++ .../java/org/apache/doris/binlog/DBBinlog.java | 13 +++ .../org/apache/doris/binlog/DropTableRecord.java | 59 ++++++++++++ .../java/org/apache/doris/binlog/TableBinlog.java | 16 ++-- .../java/org/apache/doris/persist/EditLog.java | 22 ++++- .../doris/persist/TableAddOrDropColumnsInfo.java | 9 +- .../persist/TableAddOrDropInvertedIndicesInfo.java | 5 +- .../apache/doris/service/FrontendServiceImpl.java | 2 + .../org/apache/doris/alter/RollupJobV2Test.java | 2 +- .../apache/doris/alter/SchemaChangeJobV2Test.java | 2 +- .../persist/TableAddOrDropColumnsInfoTest.java | 2 +- gensrc/thrift/FrontendService.thrift | 2 + 24 files changed, 368 insertions(+), 71 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 55a87cdf9b..a82209a866 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -219,7 +219,7 @@ public class Alter { ((SchemaChangeHandler) schemaChangeHandler).updateBinlogConfig(db, olapTable, alterClauses); } else if (currentAlterOps.hasSchemaChangeOp()) { // if modify storage type to v2, do schema change to convert all related tablets to segment v2 format - schemaChangeHandler.process(alterClauses, clusterName, db, olapTable); + schemaChangeHandler.process(stmt.toSql(), alterClauses, clusterName, db, olapTable); } else if (currentAlterOps.hasRollupOp()) { materializedViewHandler.process(alterClauses, clusterName, db, olapTable); } else if (currentAlterOps.hasPartitionOp()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 80a0e33393..7ab2f8732d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -173,9 +173,18 @@ public abstract class AlterHandler extends MasterDaemon { /* * entry function. handle alter ops */ - public abstract void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) + public abstract void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db, + OlapTable olapTable) throws UserException; + /* + * entry function. handle alter ops + */ + public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) + throws UserException { + process("", alterClauses, clusterName, db, olapTable); + } + /* * entry function. handle alter ops for external table */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 7004fc2834..fb616fe429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -86,8 +86,12 @@ public abstract class AlterJobV2 implements Writable { protected long finishedTimeMs = -1; @SerializedName(value = "timeoutMs") protected long timeoutMs = -1; + @SerializedName(value = "rawSql") + protected String rawSql; - public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { + public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName, + long timeoutMs) { + this.rawSql = rawSql; this.jobId = jobId; this.type = jobType; this.dbId = dbId; @@ -240,4 +244,8 @@ public abstract class AlterJobV2 implements Writable { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, AlterJobV2.class); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 10dddcd94d..af1915b6fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -208,10 +208,11 @@ public class MaterializedViewHandler extends AlterHandler { List<Column> mvColumns = checkAndPrepareMaterializedView(addMVClause, olapTable); // Step2: create mv job - RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns, - addMVClause.getWhereClauseItemExpr(olapTable), - addMVClause.getProperties(), olapTable, db, baseIndexId, - addMVClause.getMVKeysType(), addMVClause.getOrigStmt()); + RollupJobV2 rollupJobV2 = + createMaterializedViewJob(addMVClause.toSql(), mvIndexName, baseIndexName, mvColumns, + addMVClause.getWhereClauseItemExpr(olapTable), + addMVClause.getProperties(), olapTable, db, baseIndexId, + addMVClause.getMVKeysType(), addMVClause.getOrigStmt()); addAlterJobV2(rollupJobV2); @@ -236,7 +237,7 @@ public class MaterializedViewHandler extends AlterHandler { * @throws DdlException * @throws AnalysisException */ - public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, OlapTable olapTable) + public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable) throws DdlException, AnalysisException { checkReplicaCount(olapTable); Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>(); @@ -285,8 +286,10 @@ public class MaterializedViewHandler extends AlterHandler { addRollupClause, olapTable, baseIndexId, changeStorageFormat); // step 3 create rollup job - RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, null, - addRollupClause.getProperties(), olapTable, db, baseIndexId, olapTable.getKeysType(), null); + RollupJobV2 alterJobV2 = + createMaterializedViewJob(rawSql, rollupIndexName, baseIndexName, rollupSchema, null, + addRollupClause.getProperties(), olapTable, db, baseIndexId, olapTable.getKeysType(), + null); rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2); logJobIdSet.add(alterJobV2.getJobId()); @@ -335,7 +338,7 @@ public class MaterializedViewHandler extends AlterHandler { * @throws DdlException * @throws AnalysisException */ - private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName, + private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, String baseIndexName, List<Column> mvColumns, Column whereColumn, Map<String, String> properties, OlapTable olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt) throws DdlException, AnalysisException { @@ -364,7 +367,7 @@ public class MaterializedViewHandler extends AlterHandler { IdGeneratorBuffer idGeneratorBuffer = env.getIdGeneratorBuffer(bufferSize); long jobId = idGeneratorBuffer.getNextId(); long mvIndexId = idGeneratorBuffer.getNextId(); - RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs, + RollupJobV2 mvJob = new RollupJobV2(rawSql, jobId, dbId, tableId, olapTable.getName(), timeoutMs, baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, whereColumn, baseSchemaHash, mvSchemaHash, mvKeysType, mvShortKeyColumnCount, origStmt); @@ -1196,7 +1199,8 @@ public class MaterializedViewHandler extends AlterHandler { } @Override - public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) + public void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db, + OlapTable olapTable) throws DdlException, AnalysisException, MetaNotFoundException { if (olapTable.isDuplicateWithoutKey()) { throw new DdlException("Duplicate table without keys do not support alter rollup!"); @@ -1204,7 +1208,7 @@ public class MaterializedViewHandler extends AlterHandler { Optional<AlterClause> alterClauseOptional = alterClauses.stream().findAny(); if (alterClauseOptional.isPresent()) { if (alterClauseOptional.get() instanceof AddRollupClause) { - processBatchAddRollup(alterClauses, db, olapTable); + processBatchAddRollup(rawSql, alterClauses, db, olapTable); } else if (alterClauseOptional.get() instanceof DropRollupClause) { processBatchDropRollup(alterClauses, db, olapTable); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 908fffc6bd..e05130b103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -148,12 +148,14 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { super(JobType.ROLLUP); } - public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, - long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema, - Column whereColumn, - int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount, - OriginStatement origStmt) throws AnalysisException { - super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); + public RollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs, + long baseIndexId, + long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema, + Column whereColumn, + int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, + short rollupShortKeyColumnCount, + OriginStatement origStmt) throws AnalysisException { + super(rawSql, jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); this.baseIndexId = baseIndexId; this.rollupIndexId = rollupIndexId; @@ -883,4 +885,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } setColumnsDefineExpr(stmt.getMVColumnItemList()); } + + @Override + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 8075443439..ff5d54f8a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1171,7 +1171,7 @@ public class SchemaChangeHandler extends AlterHandler { } } - private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap, + private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap, Map<String, String> propertyMap, List<Index> indexes) throws UserException { checkReplicaCount(olapTable); @@ -1463,8 +1463,9 @@ public class SchemaChangeHandler extends AlterHandler { long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, changedIndexIdToSchema.keySet()); IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); long jobId = idGeneratorBuffer.getNextId(); - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, dbId, olapTable.getId(), olapTable.getName(), - timeoutSecond * 1000); + SchemaChangeJobV2 schemaChangeJob = + new SchemaChangeJobV2(rawSql, jobId, dbId, olapTable.getId(), olapTable.getName(), + timeoutSecond * 1000); schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp); schemaChangeJob.setAlterIndexInfo(hasIndexChange, indexes); @@ -1741,7 +1742,8 @@ public class SchemaChangeHandler extends AlterHandler { } @Override - public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable) + public void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db, + OlapTable olapTable) throws UserException { olapTable.writeLockOrDdlException(); try { @@ -1985,18 +1987,18 @@ public class SchemaChangeHandler extends AlterHandler { if (lightSchemaChange) { long jobId = Env.getCurrentEnv().getNextId(); //for schema change add/drop value column optimize, direct modify table meta. - modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes, + modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes, null, isDropIndex, jobId, false); } else if (lightIndexChange) { long jobId = Env.getCurrentEnv().getNextId(); //for schema change add/drop inverted index optimize, direct modify table meta firstly. - modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes, + modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes, alterIndexes, isDropIndex, jobId, false); } else if (buildIndexChange) { buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap, alterIndexes, invertedIndexOnPartitions, false); } else { - createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); + createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); } } finally { olapTable.writeUnlock(); @@ -2459,7 +2461,7 @@ public class SchemaChangeHandler extends AlterHandler { } // the invoker should keep table's write lock - public void modifyTableLightSchemaChange(Database db, OlapTable olapTable, + public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes, List<Index> alterIndexes, boolean isDropIndex, long jobId, boolean isReplay) @@ -2488,7 +2490,7 @@ public class SchemaChangeHandler extends AlterHandler { //for compatibility, we need create a finished state schema change job v2 - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(), + SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(rawSql, jobId, db.getId(), olapTable.getId(), olapTable.getName(), 1000); for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) { @@ -2517,9 +2519,8 @@ public class SchemaChangeHandler extends AlterHandler { if (alterIndexes != null) { if (!isReplay) { - TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo( - db.getId(), olapTable.getId(), indexSchemaMap, indexes, - alterIndexes, isDropIndex, jobId); + TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo(rawSql, db.getId(), + olapTable.getId(), indexSchemaMap, indexes, alterIndexes, isDropIndex, jobId); LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info); Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info); @@ -2542,7 +2543,7 @@ public class SchemaChangeHandler extends AlterHandler { olapTable.getName(), jobId, isReplay); } else { if (!isReplay) { - TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(), + TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(rawSql, db.getId(), olapTable.getId(), indexSchemaMap, indexes, jobId); LOG.debug("logModifyTableAddOrDropColumns info:{}", info); Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info); @@ -2564,7 +2565,7 @@ public class SchemaChangeHandler extends AlterHandler { OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); olapTable.writeLock(); try { - modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, indexes, null, false, jobId, true); + modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId, true); } catch (DdlException e) { // should not happen LOG.warn("failed to replay modify table add or drop or modify columns", e); @@ -2695,7 +2696,7 @@ public class SchemaChangeHandler extends AlterHandler { OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); olapTable.writeLock(); try { - modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes, + modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, newIndexes, alterIndexes, isDropIndex, jobId, true); } catch (UserException e) { // should not happen diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 54dac1039f..cc10efa951 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -140,8 +140,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { super(JobType.SCHEMA_CHANGE); } - public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) { - super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs); + public SchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs) { + super(rawSql, jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs); } public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) { @@ -937,4 +937,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 { String json = GsonUtils.GSON.toJson(this, AlterJobV2.class); Text.writeString(out, json); } + + @Override + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index c886740c8f..9695200a79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -105,8 +105,9 @@ public class SystemHandler extends AlterHandler { @Override // add synchronized to avoid process 2 or more stmts at same time - public synchronized void process(List<AlterClause> alterClauses, String clusterName, Database dummyDb, - OlapTable dummyTbl) throws UserException { + public synchronized void process(String rawSql, List<AlterClause> alterClauses, String clusterName, + Database dummyDb, + OlapTable dummyTbl) throws UserException { Preconditions.checkArgument(alterClauses.size() == 1); AlterClause alterClause = alterClauses.get(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java index 76d570ab6e..a268047043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java @@ -141,4 +141,12 @@ public class AbstractBackupStmt extends DdlStmt { public long getTimeoutMs() { return timeoutMs; } + + public void setProperty(String key, String value) { + properties.put(key, value); + } + + public void removeProperty(String key) { + properties.remove(key); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java index 9aa0a4685a..ab1584c5f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java @@ -70,8 +70,8 @@ public class IndexDef { this.properties = properties; } if (indexType == IndexType.NGRAM_BF) { - properties.putIfAbsent(NGRAM_SIZE_KEY, DEFAULT_NGRAM_SIZE); - properties.putIfAbsent(NGRAM_BF_SIZE_KEY, DEFAULT_NGRAM_BF_SIZE); + this.properties.putIfAbsent(NGRAM_SIZE_KEY, DEFAULT_NGRAM_SIZE); + this.properties.putIfAbsent(NGRAM_BF_SIZE_KEY, DEFAULT_NGRAM_BF_SIZE); } } @@ -122,23 +122,25 @@ public class IndexDef { if (tableName != null && !tableName.isEmpty()) { sb.append(" ON ").append(tableName); } - sb.append(" ("); - boolean first = true; - for (String col : columns) { - if (first) { - first = false; - } else { - sb.append(","); + if (columns != null && columns.size() > 0) { + sb.append(" ("); + boolean first = true; + for (String col : columns) { + if (first) { + first = false; + } else { + sb.append(","); + } + sb.append("`" + col + "`"); } - sb.append("`" + col + "`"); + sb.append(")"); } - sb.append(")"); if (indexType != null) { sb.append(" USING ").append(indexType.toString()); } if (properties != null && properties.size() > 0) { sb.append(" PROPERTIES("); - first = true; + boolean first = true; for (Map.Entry<String, String> e : properties.entrySet()) { if (first) { first = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java index 2382093d6f..895ad48ce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java @@ -98,6 +98,10 @@ public class RestoreStmt extends AbstractBackupStmt { return jobInfo; } + public void disableDynamicPartition() { + setProperty(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, "false"); + } + @Override public void analyze(Analyzer analyzer) throws UserException { if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 822f045da8..63e773af4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -17,12 +17,14 @@ package org.apache.doris.binlog; +import org.apache.doris.alter.AlterJobV2; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -129,6 +131,18 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); } + public void addCreateTableRecord(CreateTableRecord createTableRecord) { + long dbId = createTableRecord.getDbId(); + List<Long> tableIds = new ArrayList<Long>(); + tableIds.add(createTableRecord.getTableId()); + long commitSeq = createTableRecord.getCommitSeq(); + long timestamp = -1; + TBinlogType type = TBinlogType.CREATE_TABLE; + String data = createTableRecord.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + } + public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) { long dbId = dropPartitionInfo.getDbId(); List<Long> tableIds = new ArrayList<Long>(); @@ -140,6 +154,40 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); } + public void addDropTableRecord(DropTableRecord record) { + long dbId = record.getDbId(); + List<Long> tableIds = new ArrayList<Long>(); + tableIds.add(record.getTableId()); + long commitSeq = record.getCommitSeq(); + long timestamp = -1; + TBinlogType type = TBinlogType.DROP_TABLE; + String data = record.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + } + + public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) { + long dbId = alterJob.getDbId(); + List<Long> tableIds = new ArrayList<Long>(); + tableIds.add(alterJob.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.ALTER_JOB; + String data = alterJob.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + } + + public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) { + long dbId = info.getDbId(); + List<Long> tableIds = new ArrayList<Long>(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data); + } + // get binlog by dbId, return first binlog.version > version public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); @@ -329,10 +377,6 @@ public class BinlogManager { } public long read(DataInputStream dis, long checksum) throws IOException { - if (!Config.enable_feature_binlog) { - return checksum; - } - // Step 1: read binlogs length int size = dis.readInt(); LOG.info("read binlogs length: {}", size); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/CreateTableRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/CreateTableRecord.java new file mode 100644 index 0000000000..50557195b4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/CreateTableRecord.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.persist.CreateTableInfo; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class CreateTableRecord { + private static final Logger LOG = LogManager.getLogger(CreateTableRecord.class); + + @SerializedName(value = "commitSeq") + private long commitSeq; + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tableId") + private long tableId; + @SerializedName(value = "sql") + private String sql; + + public CreateTableRecord(long commitSeq, CreateTableInfo info) { + Table table = info.getTable(); + + this.commitSeq = commitSeq; + this.tableId = table.getId(); + String dbName = info.getDbName(); + + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + this.dbId = -1L; + } else { + this.dbId = db.getId(); + } + + List<String> createTableStmt = Lists.newArrayList(); + List<String> addPartitionStmt = Lists.newArrayList(); + List<String> createRollupStmt = Lists.newArrayList(); + + table.readLock(); + try { + Env.getDdlStmt(table, createTableStmt, addPartitionStmt, createRollupStmt, false, false /* show password */, + -1L); + } finally { + table.readUnlock(); + } + if (createTableStmt.size() > 0) { + this.sql = createTableStmt.get(0); + } else { + this.sql = ""; + } + } + + public long getCommitSeq() { + return commitSeq; + } + + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public String getSql() { + return sql; + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 48c20becaf..0f113ff491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -83,6 +83,19 @@ public class DBBinlog { return; } + // HACK: for metadata fix + if (!binlog.isSetType()) { + return; + } + switch (binlog.getType()) { + case CREATE_TABLE: + return; + case DROP_TABLE: + return; + default: + break; + } + for (long tableId : tableIds) { TableBinlog tableBinlog = tableBinlogMap.get(tableId); if (tableBinlog == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java new file mode 100644 index 0000000000..dd5adb36c7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DropTableRecord.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.persist.DropInfo; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +public class DropTableRecord { + @SerializedName(value = "commitSeq") + private long commitSeq; + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tableId") + private long tableId; + + public DropTableRecord(long commitSeq, DropInfo info) { + this.commitSeq = commitSeq; + this.dbId = info.getDbId(); + this.tableId = info.getTableId(); + } + + public long getCommitSeq() { + return commitSeq; + } + + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 44545b6fb3..2b0d45b694 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -149,15 +149,17 @@ public class TableBinlog { Iterator<TBinlog> iter = binlogs.iterator(); while (iter.hasNext()) { TBinlog binlog = iter.next(); - if (binlog.getTimestamp() <= expireMs) { - if (binlog.getType() == TBinlogType.UPSERT) { - tombstoneUpsert = binlog; - } - largestExpiredCommitSeq = binlog.getCommitSeq(); - iter.remove(); - } else { + long timestamp = binlog.getTimestamp(); + + if (timestamp > expireMs) { break; } + + if (binlog.getType() == TBinlogType.UPSERT) { + tombstoneUpsert = binlog; + } + largestExpiredCommitSeq = binlog.getCommitSeq(); + iter.remove(); } } finally { lock.writeLock().unlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index b67ca9259e..b92b2cb980 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -25,6 +25,8 @@ import org.apache.doris.backup.BackupJob; import org.apache.doris.backup.Repository; import org.apache.doris.backup.RestoreJob; import org.apache.doris.binlog.AddPartitionRecord; +import org.apache.doris.binlog.CreateTableRecord; +import org.apache.doris.binlog.DropTableRecord; import org.apache.doris.binlog.UpsertRecord; import org.apache.doris.blockrule.SqlBlockRule; import org.apache.doris.catalog.BrokerMgr; @@ -210,7 +212,9 @@ public class EditLog { CreateTableInfo info = (CreateTableInfo) journal.getData(); LOG.info("Begin to unprotect create table. db = " + info.getDbName() + " table = " + info.getTable() .getId()); + CreateTableRecord record = new CreateTableRecord(logId, info); env.replayCreateTable(info.getDbName(), info.getTable()); + env.getBinlogManager().addCreateTableRecord(record); break; } case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: { @@ -225,7 +229,9 @@ public class EditLog { Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getDbId()); LOG.info("Begin to unprotect drop table. db = " + db.getFullName() + " table = " + info.getTableId()); + DropTableRecord record = new DropTableRecord(logId, info); env.replayDropTable(db, info.getTableId(), info.isForceDrop(), info.getRecycleTime()); + env.getBinlogManager().addDropTableRecord(record); break; } case OperationType.OP_ADD_PARTITION: { @@ -715,6 +721,7 @@ public class EditLog { default: break; } + env.getBinlogManager().addAlterJobV2(alterJob, logId); break; } case OperationType.OP_UPDATE_COOLDOWN_CONF: @@ -868,6 +875,7 @@ public class EditLog { case OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE: { final TableAddOrDropColumnsInfo info = (TableAddOrDropColumnsInfo) journal.getData(); env.getSchemaChangeHandler().replayModifyTableLightSchemaChange(info); + env.getBinlogManager().addModifyTableAddOrDropColumns(info, logId); break; } case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: { @@ -1198,7 +1206,9 @@ public class EditLog { } public void logCreateTable(CreateTableInfo info) { - logEdit(OperationType.OP_CREATE_TABLE, info); + long logId = logEdit(OperationType.OP_CREATE_TABLE, info); + CreateTableRecord record = new CreateTableRecord(logId, info); + Env.getCurrentEnv().getBinlogManager().addCreateTableRecord(record); } public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) { @@ -1235,7 +1245,9 @@ public class EditLog { } public void logDropTable(DropInfo info) { - logEdit(OperationType.OP_DROP_TABLE, info); + long logId = logEdit(OperationType.OP_DROP_TABLE, info); + DropTableRecord record = new DropTableRecord(logId, info); + Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record); } public void logEraseTable(long tableId) { @@ -1586,7 +1598,8 @@ public class EditLog { } public void logAlterJob(AlterJobV2 alterJob) { - logEdit(OperationType.OP_ALTER_JOB_V2, alterJob); + long logId = logEdit(OperationType.OP_ALTER_JOB_V2, alterJob); + Env.getCurrentEnv().getBinlogManager().addAlterJobV2(alterJob, logId); } public void logUpdateCooldownConf(CooldownConfList cooldownConf) { @@ -1766,7 +1779,8 @@ public class EditLog { } public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) { - logEdit(OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE, info); + long logId = logEdit(OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE, info); + Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropColumns(info, logId); } public void logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropColumnsInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropColumnsInfo.java index c6cdd2ce9e..9939608067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropColumnsInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropColumnsInfo.java @@ -46,9 +46,12 @@ public class TableAddOrDropColumnsInfo implements Writable { private List<Index> indexes; @SerializedName(value = "jobId") private long jobId; + @SerializedName(value = "rawSql") + private String rawSql; - public TableAddOrDropColumnsInfo(long dbId, long tableId, + public TableAddOrDropColumnsInfo(String rawSql, long dbId, long tableId, Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes, long jobId) { + this.rawSql = rawSql; this.dbId = dbId; this.tableId = tableId; this.indexSchemaMap = indexSchemaMap; @@ -112,4 +115,8 @@ public class TableAddOrDropColumnsInfo implements Writable { sb.append(" jobId: ").append(jobId); return sb.toString(); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java index 57c7c5ede0..efdc3ab6e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java @@ -50,11 +50,14 @@ public class TableAddOrDropInvertedIndicesInfo implements Writable { private boolean isDropInvertedIndex; @SerializedName(value = "jobId") private long jobId; + @SerializedName(value = "rawSql") + private String rawSql; - public TableAddOrDropInvertedIndicesInfo(long dbId, long tableId, + public TableAddOrDropInvertedIndicesInfo(String rawSql, long dbId, long tableId, Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes, List<Index> alterInvertedIndexes, boolean isDropInvertedIndex, long jobId) { + this.rawSql = rawSql; this.dbId = dbId; this.tableId = tableId; this.indexSchemaMap = indexSchemaMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 0ea0f7d839..6d9a6a4615 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -481,6 +481,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { List<Index> newIndexes = olapTable.getCopiedIndexes(); long jobId = Env.getCurrentEnv().getNextId(); Env.getCurrentEnv().getSchemaChangeHandler().modifyTableLightSchemaChange( + "", db, olapTable, indexSchemaMap, newIndexes, null, false, jobId, false); } else { throw new MetaNotFoundException("table_id " @@ -2523,6 +2524,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { Map<String, String> properties = request.getProperties(); RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(), request.getJobInfo()); + restoreStmt.disableDynamicPartition(); LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt); try { ConnectContext ctx = ConnectContext.get(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 8830f3812b..9c60846a4d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -327,7 +327,7 @@ public class RollupJobV2Test { Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""); columns.add(column); - RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1, + RollupJobV2 rollupJobV2 = new RollupJobV2("", 1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1, KeysType.AGG_KEYS, keysCount, new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test", 0)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 954bc63706..2d3a9aac5a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -399,7 +399,7 @@ public class SchemaChangeJobV2Test { file.deleteOnExit(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); - SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1, 1, "test", 600000); + SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2("", 1, 1, 1, "test", 600000); schemaChangeJobV2.setStorageFormat(TStorageFormat.V2); Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED); Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/TableAddOrDropColumnsInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/TableAddOrDropColumnsInfoTest.java index e849198a5b..1c78f24cbf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/TableAddOrDropColumnsInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/TableAddOrDropColumnsInfoTest.java @@ -67,7 +67,7 @@ public class TableAddOrDropColumnsInfoTest { List<Index> indexes = Lists.newArrayList( new Index(0, "index", Lists.newArrayList("testCol1"), IndexDef.IndexType.BITMAP, null, "xxxxxx")); - TableAddOrDropColumnsInfo tableAddOrDropColumnsInfo1 = new TableAddOrDropColumnsInfo(dbId, tableId, + TableAddOrDropColumnsInfo tableAddOrDropColumnsInfo1 = new TableAddOrDropColumnsInfo("", dbId, tableId, indexSchemaMap, indexes, jobId); String c1Json = GsonUtils.GSON.toJson(tableAddOrDropColumnsInfo1); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9c84ba6906..51dadbd8b8 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -960,6 +960,8 @@ enum TBinlogType { CREATE_TABLE = 2, DROP_PARTITION = 3, DROP_TABLE = 4, + ALTER_JOB = 5, + MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6, } struct TBinlog { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org