This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new d250200aba [enhancement](table-meta) flush column unique ids for tables before 1.2 automatically (#23616) (#23991) d250200aba is described below commit d250200abaecac5715d7484b041aca1c8767528d Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Wed Sep 6 19:23:31 2023 +0800 [enhancement](table-meta) flush column unique ids for tables before 1.2 automatically (#23616) (#23991) --- docs/en/docs/admin-manual/config/fe-config.md | 7 + .../SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md | 62 +++++++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 6 + .../SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md | 62 +++++++ .../main/java/org/apache/doris/common/Config.java | 10 ++ fe/fe-core/src/main/cup/sql_parser.cup | 7 +- .../doris/alter/AlterLightSchChangeHelper.java | 47 +++--- .../apache/doris/alter/SchemaChangeHandler.java | 9 +- .../apache/doris/analysis/ShowConvertLSCStmt.java | 69 ++++++++ .../apache/doris/catalog/ColumnIdFlushDaemon.java | 180 +++++++++++++++++++++ .../main/java/org/apache/doris/catalog/Env.java | 8 + .../java/org/apache/doris/qe/ShowExecutor.java | 43 +++++ fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + 13 files changed, 484 insertions(+), 27 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 7f11a4a99c..eafe9938c0 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2752,3 +2752,10 @@ Default: 4 This variable indicates the number of digits by which to increase the scale of the result of division operations performed with the `/` operator. + +#### `enable_convert_light_weight_schema_change` + +Default:true + +Temporary configuration option. After it is enabled, a background thread will be started to automatically modify all olap tables to light schema change. The modification results can be viewed through the command `show convert_light_schema_change [from db]`, and the conversion results of all non-light schema change tables will be displayed. + diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md new file mode 100644 index 0000000000..91918c4452 --- /dev/null +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md @@ -0,0 +1,62 @@ +--- +{ + "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS + +### Name + +SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS + +### Description + +This statement is used to show the process of converting light schema change process. should enable config `enable_convert_light_weight_schema_change`. + +grammar: + +```sql +SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db] +``` + +### Example + +1. View the converting process in db named test + + ```sql + SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test; + ```` + +2. View the converting process globally + + ```sql + SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS; + ``` + + +### Keywords + + SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS + +### Best Practice \ No newline at end of file diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index fc71a939a4..e8be232bfc 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2754,3 +2754,9 @@ show data (其他用法:HELP SHOW DATA) 默认值:4 此变量表示增加与/运算符执行的除法操作结果规模的位数。默认为4。 + +#### `enable_convert_light_weight_schema_change` + +默认值:true + +暂时性配置项,开启后会启动后台线程自动将所有的olap表修改为可light schema change,修改结果可通过命令`show convert_light_schema_change [from db]` 来查看,将会展示所有非light schema change表的转换结果 \ No newline at end of file diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md new file mode 100644 index 0000000000..7d084a5515 --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md @@ -0,0 +1,62 @@ +--- +{ + "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS + +### Name + +SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS + +### Description + +用来查看将非light schema change的olpa表转换为light schema change表的情况, 需要开启配置`enable_convert_light_weight_schema_change` + +语法: + +```sql +SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db] +``` + +### Example + +1. 查看在database test上的转换情况 + + ```sql + SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test; + ```` + +2. 查看全局的转换情况 + + ```sql + SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS; + ``` + + +### Keywords + + SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS + +### Best Practice \ No newline at end of file diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 705c93b8c3..b95139d3b1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2100,4 +2100,14 @@ public class Config extends ConfigBase { @ConfField public static boolean forbid_running_alter_job = false; + + @ConfField + public static int table_stats_health_threshold = 80; + + @ConfField(description = { + "暂时性配置项,开启后会自动将所有的olap表修改为可light schema change", + "temporary config filed, will make all olap tables enable light schema change" + }) + public static boolean enable_convert_light_weight_schema_change = true; + } diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 39fa006140..dd8dceee64 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -640,7 +640,8 @@ terminal String KW_PREPARE, KW_EXECUTE, KW_LINES, - KW_IGNORE; + KW_IGNORE, + KW_CONVERT_LSC; terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, LBRACE, RBRACE, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER, ARROW; terminal BITAND, BITOR, BITXOR, BITNOT; @@ -4158,6 +4159,10 @@ show_param ::= {: RESULT = new ShowBuildIndexStmt(db, parser.where, orderByClause, limitClause); :} + | KW_CONVERT_LSC KW_FROM opt_db:db + {: + RESULT = new ShowConvertLSCStmt(db); + :} ; opt_tmp ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java index 6dab84bf80..1e77b0a5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Tablet; -import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.proto.InternalService.PFetchColIdsRequest; @@ -42,8 +41,8 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -52,6 +51,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -79,12 +79,11 @@ public class AlterLightSchChangeHelper { * 2. refresh table metadata * 3. write edit log */ - public void enableLightSchemaChange() throws DdlException { - final Map<Long, PFetchColIdsRequest> params = initParams(); - final AlterLightSchemaChangeInfo info = callForColumnsInfo(params); + public void enableLightSchemaChange() throws IllegalStateException { + final AlterLightSchemaChangeInfo info = callForColumnsInfo(); updateTableMeta(info); Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info); - LOG.info("successfully enable `light_schema_change`"); + LOG.info("successfully enable `light_schema_change`, db={}, tbl={}", db.getFullName(), olapTable.getName()); } /** @@ -137,35 +136,36 @@ public class AlterLightSchChangeHelper { } /** - * @param beIdToRequest rpc param for corresponding BEs * @return contains indexIds to each tablet schema info which consists of columnName to corresponding * column unique id pairs - * @throws DdlException as a wrapper for rpc failures + * @throws IllegalStateException as a wrapper for rpc failures */ - private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long, PFetchColIdsRequest> beIdToRequest) - throws DdlException { - final List<Future<PFetchColIdsResponse>> futureList = new ArrayList<>(); - // start a rpc in a pipeline way + public AlterLightSchemaChangeInfo callForColumnsInfo() + throws IllegalStateException { + Map<Long, PFetchColIdsRequest> beIdToRequest = initParams(); + Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new HashMap<>(); try { for (Long beId : beIdToRequest.keySet()) { final Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beId); - final TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + final TNetworkAddress address = + new TNetworkAddress(Objects.requireNonNull(backend).getHost(), backend.getBrpcPort()); final Future<PFetchColIdsResponse> responseFuture = BackendServiceProxy.getInstance() .getColumnIdsByTabletIds(address, beIdToRequest.get(beId)); - futureList.add(responseFuture); + beIdToRespFuture.put(beId, responseFuture); } } catch (RpcException e) { - throw new DdlException("fetch columnIds RPC failed", e); + throw new IllegalStateException("fetch columnIds RPC failed", e); } // wait for and get results final long start = System.currentTimeMillis(); long timeoutMs = ConnectContext.get().getExecTimeout() * 1000L; final List<PFetchColIdsResponse> resultList = new ArrayList<>(); try { - for (Future<PFetchColIdsResponse> future : futureList) { - final PFetchColIdsResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS); + for (Map.Entry<Long, Future<PFetchColIdsResponse>> entry : beIdToRespFuture.entrySet()) { + final PFetchColIdsResponse response = entry.getValue().get(timeoutMs, TimeUnit.MILLISECONDS); if (response.getStatus().getStatusCode() != TStatusCode.OK.getValue()) { - throw new DdlException(response.getStatus().getErrorMsgs(0)); + throw new IllegalStateException(String.format("fail to get column info from be: %s, msg:%s", + entry.getKey(), response.getStatus().getErrorMsgs(0))); } resultList.add(response); // refresh the timeout @@ -176,9 +176,9 @@ public class AlterLightSchChangeHelper { "impossible state, timeout should happened"); } } catch (InterruptedException | ExecutionException e) { - throw new DdlException("fetch columnIds RPC result failed: ", e); + throw new IllegalStateException("fetch columnIds RPC result failed: ", e); } catch (TimeoutException e) { - throw new DdlException("fetch columnIds RPC result timeout", e); + throw new IllegalStateException("fetch columnIds RPC result timeout", e); } return compactToAlterLscInfo(resultList); } @@ -207,7 +207,7 @@ public class AlterLightSchChangeHelper { return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), indexIdToTabletInfo); } - public void updateTableMeta(AlterLightSchemaChangeInfo info) throws DdlException { + public void updateTableMeta(AlterLightSchemaChangeInfo info) throws IllegalStateException { Preconditions.checkNotNull(info, "passed in info should be not null"); // update index-meta once and for all // schema pair: <maxColId, columns> @@ -242,10 +242,9 @@ public class AlterLightSchChangeHelper { indexMeta.setSchema(schemaPair.second); } } catch (IOException e) { - throw new DdlException("fail to reset index schema", e); + throw new IllegalStateException("fail to reset index schema", e); } // write table property olapTable.setEnableLightSchemaChange(true); - LOG.info("successfully update table meta for `light_schema_change`"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 16c3da53d4..3de1d43163 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2014,7 +2014,12 @@ public class SchemaChangeHandler extends AlterHandler { private void enableLightSchemaChange(Database db, OlapTable olapTable) throws DdlException { final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable); - alterLightSchChangeHelper.enableLightSchemaChange(); + try { + alterLightSchChangeHelper.enableLightSchemaChange(); + } catch (IllegalStateException e) { + throw new DdlException(String.format("failed to enable light schema change for table %s.%s", + db.getFullName(), olapTable.getName()), e); + } } public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) throws MetaNotFoundException { @@ -2024,7 +2029,7 @@ public class SchemaChangeHandler extends AlterHandler { final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable); try { alterLightSchChangeHelper.updateTableMeta(info); - } catch (DdlException e) { + } catch (IllegalStateException e) { LOG.warn("failed to replay alter light schema change", e); } finally { olapTable.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java new file mode 100644 index 0000000000..bab2e3b34b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; + +public class ShowConvertLSCStmt extends ShowStmt { + + private final String dbName; + + public ShowConvertLSCStmt(String dbName) { + this.dbName = dbName; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN) + && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), + PrivPredicate.OPERATOR)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR"); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + Column databaseColumn = new Column("database", ScalarType.createVarcharType(30)); + Column tableNameColumn = new Column("table", ScalarType.createVarcharType(30)); + Column statusColum = new Column("status", ScalarType.createVarcharType(30)); + builder.addColumn(databaseColumn); + builder.addColumn(tableNameColumn); + builder.addColumn(statusColum); + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } + + public String getDbName() { + return dbName; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java new file mode 100644 index 0000000000..28007a8e66 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.alter.AlterLightSchChangeHelper; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.persist.AlterLightSchemaChangeInfo; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; + +/** + * note(tsy): this class is temporary, make table before 1.2 to enable light schema change + */ +public class ColumnIdFlushDaemon extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(ColumnIdFlushDaemon.class); + + /** + * db name -> (tbl name -> status) + */ + private final Map<String, Map<String, FlushStatus>> resultCollector; + + private final ReadWriteLock rwLock; + + private final BiConsumer<Database, OlapTable> flushFunc; + + public ColumnIdFlushDaemon() { + super("colum-id-flusher", TimeUnit.HOURS.toMillis(1)); + resultCollector = Maps.newHashMap(); + rwLock = new ReentrantReadWriteLock(); + if (Config.enable_convert_light_weight_schema_change) { + flushFunc = this::doFlush; + } else { + flushFunc = (db, table) -> record(db.getFullName(), table.getName(), FlushStatus.init()); + } + } + + @Override + protected void runAfterCatalogReady() { + flush(); + } + + private void flush() { + List<Database> dbs = Env.getCurrentEnv().getInternalCatalog().getDbs(); + for (Database db : dbs) { + rwLock.writeLock().lock(); + try { + db.getTables() + .stream() + .filter(table -> table instanceof OlapTable) + .map(table -> (OlapTable) table) + .filter(olapTable -> !olapTable.getTableProperty().getUseSchemaLightChange()) + .forEach(table -> flushFunc.accept(db, table)); + } finally { + rwLock.writeLock().unlock(); + } + try { + // avoid too often to call be + sleep(3000); + } catch (InterruptedException ignore) { + // do nothing + } + } + } + + private void doFlush(Database db, OlapTable table) { + record(db.getFullName(), table.getName(), FlushStatus.init()); + AlterLightSchChangeHelper schChangeHelper = new AlterLightSchChangeHelper(db, table); + AlterLightSchemaChangeInfo changeInfo; + try { + changeInfo = schChangeHelper.callForColumnsInfo(); + } catch (IllegalStateException e) { + record(db.getFullName(), table.getName(), FlushStatus.failed(e.getMessage())); + return; + } + table.writeLock(); + try { + if (table.getTableProperty().getUseSchemaLightChange()) { + removeRecord(db.getFullName(), table.getName()); + return; + } + schChangeHelper.updateTableMeta(changeInfo); + Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(changeInfo); + LOG.info("successfully enable `light_schema_change`, db={}, tbl={}", + db.getFullName(), table.getName()); + removeRecord(db.getFullName(), table.getName()); + } catch (IllegalStateException e) { + record(db.getFullName(), table.getName(), FlushStatus.failed(e.getMessage())); + } finally { + table.writeUnlock(); + } + } + + private void record(String dbName, String tableName, FlushStatus status) { + resultCollector.putIfAbsent(dbName, Maps.newHashMap()); + Map<String, FlushStatus> tableToStatus = resultCollector.get(dbName); + tableToStatus.put(tableName, status); + } + + private void removeRecord(String dbName, String tableName) { + Map<String, FlushStatus> tableToStatus; + if (resultCollector.containsKey(dbName) + && (tableToStatus = resultCollector.get(dbName)).containsKey(tableName)) { + tableToStatus.remove(tableName); + if (tableToStatus.isEmpty()) { + resultCollector.remove(dbName); + } + } + } + + public Map<String, Map<String, FlushStatus>> getResultCollector() { + return resultCollector; + } + + public void readLock() { + rwLock.readLock().lock(); + } + + public void readUnlock() { + rwLock.readLock().unlock(); + } + + public static class FlushStatus { + + private FlushStatus() { + this.success = true; + this.msg = "Waiting to be converted"; + } + + private FlushStatus(String msg) { + this.success = false; + this.msg = msg; + } + + public static FlushStatus init() { + return new FlushStatus(); + } + + public static FlushStatus failed(String reason) { + return new FlushStatus(reason); + } + + public boolean isSuccess() { + return success; + } + + public String getMsg() { + return msg; + } + + private final boolean success; + + private final String msg; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 554712db8d..9fbed8b63d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -333,6 +333,8 @@ public class Env { private Daemon timePrinter; private Daemon listener; + private ColumnIdFlushDaemon columnIdFlusher; + private boolean isFirstTimeStartUp = false; private boolean isElectable; // set to true after finished replay all meta and ready to serve @@ -674,6 +676,7 @@ public class Env { this.hiveTransactionMgr = new HiveTransactionMgr(); this.binlogManager = new BinlogManager(); this.binlogGcer = new BinlogGcer(); + this.columnIdFlusher = new ColumnIdFlushDaemon(); } public static void destroyCheckpoint() { @@ -1503,6 +1506,7 @@ public class Env { // binlog gcer binlogGcer.start(); + columnIdFlusher.start(); } // start threads that should running on all FE @@ -5505,4 +5509,8 @@ public class Env { queryStats.clear(info); editLog.logCleanQueryStats(info); } + + public ColumnIdFlushDaemon getColumnIdFlusher() { + return columnIdFlusher; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 9c3e1b1b31..0ebcb1c7dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -40,6 +40,7 @@ import org.apache.doris.analysis.ShowCollationStmt; import org.apache.doris.analysis.ShowColumnHistStmt; import org.apache.doris.analysis.ShowColumnStatsStmt; import org.apache.doris.analysis.ShowColumnStmt; +import org.apache.doris.analysis.ShowConvertLSCStmt; import org.apache.doris.analysis.ShowCreateCatalogStmt; import org.apache.doris.analysis.ShowCreateDbStmt; import org.apache.doris.analysis.ShowCreateFunctionStmt; @@ -110,6 +111,7 @@ import org.apache.doris.backup.Repository; import org.apache.doris.backup.RestoreJob; import org.apache.doris.blockrule.SqlBlockRule; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ColumnIdFlushDaemon; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.DynamicPartitionProperty; @@ -236,6 +238,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -427,6 +430,8 @@ public class ShowExecutor { handleShowBuildIndexStmt(); } else if (stmt instanceof ShowAnalyzeTaskStatus) { handleShowAnalyzeTaskStatus(); + } else if (stmt instanceof ShowConvertLSCStmt) { + handleShowConvertLSC(); } else { handleEmtpy(); } @@ -2828,5 +2833,43 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } + + private void handleShowConvertLSC() { + ShowConvertLSCStmt showStmt = (ShowConvertLSCStmt) stmt; + ColumnIdFlushDaemon columnIdFlusher = Env.getCurrentEnv().getColumnIdFlusher(); + columnIdFlusher.readLock(); + List<List<String>> rows; + try { + Map<String, Map<String, ColumnIdFlushDaemon.FlushStatus>> resultCollector = + columnIdFlusher.getResultCollector(); + rows = new ArrayList<>(); + String db = ((ShowConvertLSCStmt) stmt).getDbName(); + if (db != null) { + Map<String, ColumnIdFlushDaemon.FlushStatus> tblNameToStatus = resultCollector.get(db); + if (tblNameToStatus != null) { + tblNameToStatus.forEach((tblName, status) -> { + List<String> row = new ArrayList<>(); + row.add(db); + row.add(tblName); + row.add(status.getMsg()); + rows.add(row); + }); + } + } else { + resultCollector.forEach((dbName, tblNameToStatus) -> + tblNameToStatus.forEach((tblName, status) -> { + List<String> row = new ArrayList<>(); + row.add(dbName); + row.add(tblName); + row.add(status.getMsg()); + rows.add(row); + })); + } + } finally { + columnIdFlusher.readUnlock(); + } + resultSet = new ShowResultSet(showStmt.getMetaData(), rows); + } + } diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index cc4c11607b..56f8faae18 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -493,6 +493,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES)); keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE)); keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED)); + keywordMap.put("convert_light_schema_change_process", new Integer(SqlParserSymbols.KW_CONVERT_LSC)); } // map from token id to token description --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org