This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0-alpha in repository https://gitbox.apache.org/repos/asf/doris.git
commit de36bf3c074d602021f78c5803d5ca8edab861bb Author: 奕冷 <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Thu Apr 20 19:20:03 2023 +0800 [fix](editLog) add sufficient replay logic and edit log for altering light schema change (#18746) --- ...CHelper.java => AlterLightSchChangeHelper.java} | 52 ++++++++--------- .../apache/doris/alter/SchemaChangeHandler.java | 19 +++++- .../org/apache/doris/catalog/TableProperty.java | 3 - .../org/apache/doris/journal/JournalEntity.java | 7 ++- .../doris/persist/AlterLightSchemaChangeInfo.java | 68 ++++++++++++++++++++++ .../java/org/apache/doris/persist/EditLog.java | 8 ++- 6 files changed, 123 insertions(+), 34 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java similarity index 87% rename from fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java rename to fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java index 1b808e10ea..d643aec58b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java @@ -28,7 +28,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; -import org.apache.doris.persist.ModifyTablePropertyOperationLog; +import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.proto.InternalService.PFetchColIdsRequest; import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder; import org.apache.doris.proto.InternalService.PFetchColIdsRequest.PFetchColIdParam; @@ -42,6 +42,8 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -59,13 +61,15 @@ import java.util.concurrent.TimeoutException; /** * For alter light_schema_change table property */ -public class AlterLSCHelper { +public class AlterLightSchChangeHelper { + + private static final Logger LOG = LogManager.getLogger(AlterLightSchChangeHelper.class); private final Database db; private final OlapTable olapTable; - public AlterLSCHelper(Database db, OlapTable olapTable) { + public AlterLightSchChangeHelper(Database db, OlapTable olapTable) { this.db = db; this.olapTable = olapTable; } @@ -77,9 +81,10 @@ public class AlterLSCHelper { */ public void enableLightSchemaChange() throws DdlException { final Map<Long, PFetchColIdsRequest> params = initParams(); - final PFetchColIdsResponse response = callForColumnIds(params); - updateTableMeta(response); - modifyTableProperty(); + final AlterLightSchemaChangeInfo info = callForColumnsInfo(params); + updateTableMeta(info); + Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info); + LOG.info("successfully enable `light_schema_change`"); } /** @@ -133,10 +138,12 @@ public class AlterLSCHelper { /** * @param beIdToRequest rpc param for corresponding BEs - * @return indexIds to each tablet schema info which consists of columnName to corresponding column unique id pairs + * @return contains indexIds to each tablet schema info which consists of columnName to corresponding + * column unique id pairs * @throws DdlException as a wrapper for rpc failures */ - private PFetchColIdsResponse callForColumnIds(Map<Long, PFetchColIdsRequest> beIdToRequest) throws DdlException { + private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long, PFetchColIdsRequest> beIdToRequest) + throws DdlException { final List<Future<PFetchColIdsResponse>> futureList = new ArrayList<>(); // start a rpc in a pipeline way try { @@ -173,14 +180,14 @@ public class AlterLSCHelper { } catch (TimeoutException e) { throw new DdlException("fetch columnIds RPC result timeout", e); } - return compactToUniqResp(resultList); + return compactToAlterLscInfo(resultList); } /** * Since the result collected from several BEs may contain repeated indexes in distributed storage scenarios, * we should do consistency check for the result for the same index, and get the unique result. */ - private PFetchColIdsResponse compactToUniqResp(List<PFetchColIdsResponse> resultList) { + private AlterLightSchemaChangeInfo compactToAlterLscInfo(List<PFetchColIdsResponse> resultList) { final PFetchColIdsResponse.Builder builder = PFetchColIdsResponse.newBuilder(); Map<Long, Map<String, Integer>> indexIdToTabletInfo = new HashMap<>(); resultList.forEach(response -> { @@ -197,27 +204,25 @@ public class AlterLSCHelper { "index: " + indexId + "got inconsistent schema in storage"); } }); - return builder.build(); + return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), indexIdToTabletInfo); } - private void updateTableMeta(PFetchColIdsResponse response) throws DdlException { - Preconditions.checkState(response.isInitialized()); + public void updateTableMeta(AlterLightSchemaChangeInfo info) throws DdlException { + Preconditions.checkNotNull(info, "passed in info should be not null"); // update index-meta once and for all // schema pair: <maxColId, columns> final List<Pair<Integer, List<Column>>> schemaPairs = new ArrayList<>(); final List<Long> indexIds = new ArrayList<>(); - response.getEntriesList().forEach(entry -> { - final long indexId = entry.getIndexId(); + info.getIndexIdToColumnInfo().forEach((indexId, colNameToId) -> { final List<Column> columns = olapTable.getSchemaByIndexId(indexId, true); - final Map<String, Integer> colNameToId = entry.getColNameToIdMap(); Preconditions.checkState(columns.size() == colNameToId.size(), - "size mismatch for columns meta from BE"); + "size mismatch for original columns meta and that in change info"); int maxColId = Column.COLUMN_UNIQUE_ID_INIT_VALUE; final List<Column> newSchema = new ArrayList<>(); for (Column column : columns) { final String columnName = column.getName(); final int columnId = Preconditions.checkNotNull(colNameToId.get(columnName), - "failed to fetch column id of column:{" + columnName + "} from BE"); + "failed to fetch column id of column:{" + columnName + "}"); final Column newColumn = new Column(column); newColumn.setUniqueId(columnId); newSchema.add(newColumn); @@ -226,7 +231,8 @@ public class AlterLSCHelper { schemaPairs.add(Pair.of(maxColId, newSchema)); indexIds.add(indexId); }); - Preconditions.checkState(schemaPairs.size() == indexIds.size()); + Preconditions.checkState(schemaPairs.size() == indexIds.size(), + "impossible state, size of schemaPairs and indexIds should be the same"); // update index-meta once and for all try { for (int i = 0; i < indexIds.size(); i++) { @@ -238,14 +244,8 @@ public class AlterLSCHelper { } catch (IOException e) { throw new DdlException("fail to reset index schema", e); } - } - - private void modifyTableProperty() { // write table property olapTable.setEnableLightSchemaChange(true); - //write edit log - ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), olapTable.getId(), - olapTable.getTableProperty().getProperties()); - Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info); + LOG.info("successfully update table meta for `light_schema_change`"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index ffce0bbb9b..c9ce361e3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -73,6 +73,7 @@ import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo; @@ -1890,8 +1891,22 @@ public class SchemaChangeHandler extends AlterHandler { private void enableLightSchemaChange(Database db, OlapTable olapTable) throws DdlException { - final AlterLSCHelper alterLSCHelper = new AlterLSCHelper(db, olapTable); - alterLSCHelper.enableLightSchemaChange(); + final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable); + alterLightSchChangeHelper.enableLightSchemaChange(); + } + + public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) throws MetaNotFoundException { + Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(info.getDbId()); + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP); + olapTable.writeLock(); + final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable); + try { + alterLightSchChangeHelper.updateTableMeta(info); + } catch (DdlException e) { + LOG.warn("failed to replay alter light schema change", e); + } finally { + olapTable.writeUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index add5162d75..4078b7473d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -107,9 +107,6 @@ public class TableProperty implements Writable { buildInMemory(); buildStoragePolicy(); break; - case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: - buildEnableLightSchemaChange(); - break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index ccbbd764a3..2162fc9702 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -61,6 +61,7 @@ import org.apache.doris.mtmv.metadata.DropMTMVTask; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.mysql.privilege.UserPropertyInfo; +import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.persist.AlterMultiMaterializedView; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.AlterUserOperationLog; @@ -625,7 +626,6 @@ public class JournalEntity implements Writable { } case OperationType.OP_DYNAMIC_PARTITION: case OperationType.OP_MODIFY_IN_MEMORY: - case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: case OperationType.OP_MODIFY_REPLICATION_NUM: { data = ModifyTablePropertyOperationLog.read(in); isRead = true; @@ -808,6 +808,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: { + data = AlterLightSchemaChangeInfo.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java new file mode 100644 index 0000000000..e76ebe9d64 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class AlterLightSchemaChangeInfo implements Writable { + + @SerializedName(value = "dbId") + private Long dbId; + + @SerializedName(value = "tableId") + private Long tableId; + + @SerializedName("indexIdToColumnInfo") + private Map<Long, Map<String, Integer>> indexIdToColumnInfo; + + public AlterLightSchemaChangeInfo(long dbId, long tableId, Map<Long, Map<String, Integer>> info) { + this.dbId = dbId; + this.tableId = tableId; + this.indexIdToColumnInfo = info; + } + + public Long getDbId() { + return dbId; + } + + public Long getTableId() { + return tableId; + } + + public Map<Long, Map<String, Integer>> getIndexIdToColumnInfo() { + return indexIdToColumnInfo; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static AlterLightSchemaChangeInfo read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), AlterLightSchemaChangeInfo.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index fc837efd70..63d3f348aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -765,7 +765,6 @@ public class EditLog { } case OperationType.OP_DYNAMIC_PARTITION: case OperationType.OP_MODIFY_IN_MEMORY: - case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: case OperationType.OP_MODIFY_REPLICATION_NUM: { ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData(); env.replayModifyTableProperty(opCode, log); @@ -897,6 +896,11 @@ public class EditLog { env.getSchemaChangeHandler().replayModifyTableLightSchemaChange(info); break; } + case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: { + final AlterLightSchemaChangeInfo info = (AlterLightSchemaChangeInfo) journal.getData(); + env.getSchemaChangeHandler().replayAlterLightSchChange(info); + break; + } case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES: { final TableAddOrDropInvertedIndicesInfo info = (TableAddOrDropInvertedIndicesInfo) journal.getData(); @@ -1607,7 +1611,7 @@ public class EditLog { logEdit(OperationType.OP_MODIFY_IN_MEMORY, info); } - public void logAlterLightSchemaChange(ModifyTablePropertyOperationLog info) { + public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) { logEdit(OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE, info); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org