This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d250200aba [enhancement](table-meta) flush column unique ids for 
tables before 1.2 automatically  (#23616) (#23991)
d250200aba is described below

commit d250200abaecac5715d7484b041aca1c8767528d
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Wed Sep 6 19:23:31 2023 +0800

    [enhancement](table-meta) flush column unique ids for tables before 1.2 
automatically  (#23616) (#23991)
---
 docs/en/docs/admin-manual/config/fe-config.md      |   7 +
 .../SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md    |  62 +++++++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |   6 +
 .../SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md    |  62 +++++++
 .../main/java/org/apache/doris/common/Config.java  |  10 ++
 fe/fe-core/src/main/cup/sql_parser.cup             |   7 +-
 .../doris/alter/AlterLightSchChangeHelper.java     |  47 +++---
 .../apache/doris/alter/SchemaChangeHandler.java    |   9 +-
 .../apache/doris/analysis/ShowConvertLSCStmt.java  |  69 ++++++++
 .../apache/doris/catalog/ColumnIdFlushDaemon.java  | 180 +++++++++++++++++++++
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +
 .../java/org/apache/doris/qe/ShowExecutor.java     |  43 +++++
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 13 files changed, 484 insertions(+), 27 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 7f11a4a99c..eafe9938c0 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2752,3 +2752,10 @@ Default: 4
 
 This variable indicates the number of digits by which to increase the scale of 
the result of
 division operations performed with the `/` operator.
+
+#### `enable_convert_light_weight_schema_change`
+
+Default:true
+
+Temporary configuration option. After it is enabled, a background thread will 
be started to automatically modify all olap tables to light schema change. The 
modification results can be viewed through the command `show 
convert_light_schema_change [from db]`, and the conversion results of all 
non-light schema change tables will be displayed.
+
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
 
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
new file mode 100644
index 0000000000..91918c4452
--- /dev/null
+++ 
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHR-SCHEMA-CHANGE-PROCESS.md
@@ -0,0 +1,62 @@
+---
+{
+    "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS
+
+### Name
+
+SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS
+
+### Description
+
+This statement is used to show the process of converting light schema change 
process. should enable config `enable_convert_light_weight_schema_change`.
+
+grammar:
+
+```sql
+SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db]
+```
+
+### Example
+
+1. View the converting process in db named test 
+
+    ```sql
+     SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test;
+    ````
+
+2. View the converting process globally
+
+    ```sql
+    SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS;
+    ```
+
+
+### Keywords
+
+    SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS
+
+### Best Practice
\ No newline at end of file
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index fc71a939a4..e8be232bfc 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2754,3 +2754,9 @@ show data (其他用法:HELP SHOW DATA)
 默认值:4
 
 此变量表示增加与/运算符执行的除法操作结果规模的位数。默认为4。
+
+#### `enable_convert_light_weight_schema_change`
+
+默认值:true
+
+暂时性配置项,开启后会启动后台线程自动将所有的olap表修改为可light schema change,修改结果可通过命令`show 
convert_light_schema_change [from db]` 来查看,将会展示所有非light schema change表的转换结果
\ No newline at end of file
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
new file mode 100644
index 0000000000..7d084a5515
--- /dev/null
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS.md
@@ -0,0 +1,62 @@
+---
+{
+    "title": "SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## SHOW-CONVERT-LIGHT-SCHEMA-CHANGE-PROCESS
+
+### Name
+
+SHOW CONVERT LIGHT SCHEMA CHANGE PROCESS
+
+### Description
+
+用来查看将非light schema change的olpa表转换为light schema change表的情况, 
需要开启配置`enable_convert_light_weight_schema_change`
+
+语法:
+
+```sql
+SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS [FROM DATABASE db]
+```
+
+### Example
+
+1. 查看在database test上的转换情况
+
+    ```sql
+     SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS FROM DATABASE test;
+    ````
+
+2. 查看全局的转换情况
+
+    ```sql
+    SHOW CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS;
+    ```
+
+
+### Keywords
+
+    SHOW, CONVERT_LIGHT_SCHEMA_CHANGE_PROCESS
+
+### Best Practice
\ No newline at end of file
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 705c93b8c3..b95139d3b1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2100,4 +2100,14 @@ public class Config extends ConfigBase {
 
     @ConfField
     public static boolean forbid_running_alter_job = false;
+
+    @ConfField
+    public static int table_stats_health_threshold = 80;
+
+    @ConfField(description = {
+            "暂时性配置项,开启后会自动将所有的olap表修改为可light schema change",
+            "temporary config filed, will make all olap tables enable light 
schema change"
+    })
+    public static boolean enable_convert_light_weight_schema_change = true;
+
 }
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 39fa006140..dd8dceee64 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -640,7 +640,8 @@ terminal String
     KW_PREPARE,
     KW_EXECUTE,
     KW_LINES,
-    KW_IGNORE;
+    KW_IGNORE,
+    KW_CONVERT_LSC;
 
 terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, 
LBRACKET, RBRACKET, LBRACE, RBRACE, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER, 
ARROW;
 terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -4158,6 +4159,10 @@ show_param ::=
     {:
         RESULT = new ShowBuildIndexStmt(db, parser.where, orderByClause, 
limitClause);
     :}
+    | KW_CONVERT_LSC KW_FROM opt_db:db
+    {:
+        RESULT = new ShowConvertLSCStmt(db);
+    :}
     ;
 
 opt_tmp ::=
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
index 6dab84bf80..1e77b0a5ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Tablet;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
@@ -42,8 +41,8 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -52,6 +51,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -79,12 +79,11 @@ public class AlterLightSchChangeHelper {
      * 2. refresh table metadata
      * 3. write edit log
      */
-    public void enableLightSchemaChange() throws DdlException {
-        final Map<Long, PFetchColIdsRequest> params = initParams();
-        final AlterLightSchemaChangeInfo info = callForColumnsInfo(params);
+    public void enableLightSchemaChange() throws IllegalStateException {
+        final AlterLightSchemaChangeInfo info = callForColumnsInfo();
         updateTableMeta(info);
         Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
-        LOG.info("successfully enable `light_schema_change`");
+        LOG.info("successfully enable `light_schema_change`, db={}, tbl={}", 
db.getFullName(), olapTable.getName());
     }
 
     /**
@@ -137,35 +136,36 @@ public class AlterLightSchChangeHelper {
     }
 
     /**
-     * @param beIdToRequest rpc param for corresponding BEs
      * @return contains indexIds to each tablet schema info which consists of 
columnName to corresponding
      * column unique id pairs
-     * @throws DdlException as a wrapper for rpc failures
+     * @throws IllegalStateException as a wrapper for rpc failures
      */
-    private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long, 
PFetchColIdsRequest> beIdToRequest)
-            throws DdlException {
-        final List<Future<PFetchColIdsResponse>> futureList = new 
ArrayList<>();
-        // start a rpc in a pipeline way
+    public AlterLightSchemaChangeInfo callForColumnsInfo()
+            throws IllegalStateException {
+        Map<Long, PFetchColIdsRequest> beIdToRequest = initParams();
+        Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new 
HashMap<>();
         try {
             for (Long beId : beIdToRequest.keySet()) {
                 final Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beId);
-                final TNetworkAddress address = new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+                final TNetworkAddress address =
+                        new 
TNetworkAddress(Objects.requireNonNull(backend).getHost(), 
backend.getBrpcPort());
                 final Future<PFetchColIdsResponse> responseFuture = 
BackendServiceProxy.getInstance()
                         .getColumnIdsByTabletIds(address, 
beIdToRequest.get(beId));
-                futureList.add(responseFuture);
+                beIdToRespFuture.put(beId, responseFuture);
             }
         } catch (RpcException e) {
-            throw new DdlException("fetch columnIds RPC failed", e);
+            throw new IllegalStateException("fetch columnIds RPC failed", e);
         }
         // wait for and get results
         final long start = System.currentTimeMillis();
         long timeoutMs = ConnectContext.get().getExecTimeout() * 1000L;
         final List<PFetchColIdsResponse> resultList = new ArrayList<>();
         try {
-            for (Future<PFetchColIdsResponse> future : futureList) {
-                final PFetchColIdsResponse response = future.get(timeoutMs, 
TimeUnit.MILLISECONDS);
+            for (Map.Entry<Long, Future<PFetchColIdsResponse>> entry : 
beIdToRespFuture.entrySet()) {
+                final PFetchColIdsResponse response = 
entry.getValue().get(timeoutMs, TimeUnit.MILLISECONDS);
                 if (response.getStatus().getStatusCode() != 
TStatusCode.OK.getValue()) {
-                    throw new 
DdlException(response.getStatus().getErrorMsgs(0));
+                    throw new IllegalStateException(String.format("fail to get 
column info from be: %s, msg:%s",
+                            entry.getKey(), 
response.getStatus().getErrorMsgs(0)));
                 }
                 resultList.add(response);
                 // refresh the timeout
@@ -176,9 +176,9 @@ public class AlterLightSchChangeHelper {
                         "impossible state, timeout should happened");
             }
         } catch (InterruptedException | ExecutionException e) {
-            throw new DdlException("fetch columnIds RPC result failed: ", e);
+            throw new IllegalStateException("fetch columnIds RPC result 
failed: ", e);
         } catch (TimeoutException e) {
-            throw new DdlException("fetch columnIds RPC result timeout", e);
+            throw new IllegalStateException("fetch columnIds RPC result 
timeout", e);
         }
         return compactToAlterLscInfo(resultList);
     }
@@ -207,7 +207,7 @@ public class AlterLightSchChangeHelper {
         return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), 
indexIdToTabletInfo);
     }
 
-    public void updateTableMeta(AlterLightSchemaChangeInfo info) throws 
DdlException {
+    public void updateTableMeta(AlterLightSchemaChangeInfo info) throws 
IllegalStateException {
         Preconditions.checkNotNull(info, "passed in info should be not null");
         // update index-meta once and for all
         // schema pair: <maxColId, columns>
@@ -242,10 +242,9 @@ public class AlterLightSchChangeHelper {
                 indexMeta.setSchema(schemaPair.second);
             }
         } catch (IOException e) {
-            throw new DdlException("fail to reset index schema", e);
+            throw new IllegalStateException("fail to reset index schema", e);
         }
         // write table property
         olapTable.setEnableLightSchemaChange(true);
-        LOG.info("successfully update table meta for `light_schema_change`");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 16c3da53d4..3de1d43163 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2014,7 +2014,12 @@ public class SchemaChangeHandler extends AlterHandler {
 
     private void enableLightSchemaChange(Database db, OlapTable olapTable) 
throws DdlException {
         final AlterLightSchChangeHelper alterLightSchChangeHelper = new 
AlterLightSchChangeHelper(db, olapTable);
-        alterLightSchChangeHelper.enableLightSchemaChange();
+        try {
+            alterLightSchChangeHelper.enableLightSchemaChange();
+        } catch (IllegalStateException e) {
+            throw new DdlException(String.format("failed to enable light 
schema change for table %s.%s",
+                    db.getFullName(), olapTable.getName()), e);
+        }
     }
 
     public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) 
throws MetaNotFoundException {
@@ -2024,7 +2029,7 @@ public class SchemaChangeHandler extends AlterHandler {
         final AlterLightSchChangeHelper alterLightSchChangeHelper = new 
AlterLightSchChangeHelper(db, olapTable);
         try {
             alterLightSchChangeHelper.updateTableMeta(info);
-        } catch (DdlException e) {
+        } catch (IllegalStateException e) {
             LOG.warn("failed to replay alter light schema change", e);
         } finally {
             olapTable.writeUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java
new file mode 100644
index 0000000000..bab2e3b34b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowConvertLSCStmt.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+public class ShowConvertLSCStmt extends ShowStmt {
+
+    private final String dbName;
+
+    public ShowConvertLSCStmt(String dbName) {
+        this.dbName = dbName;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+
+        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)
+                && 
!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
+                PrivPredicate.OPERATOR)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN/OPERATOR");
+        }
+    }
+
+    @Override
+    public ShowResultSetMetaData getMetaData() {
+        ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
+        Column databaseColumn = new Column("database", 
ScalarType.createVarcharType(30));
+        Column tableNameColumn = new Column("table", 
ScalarType.createVarcharType(30));
+        Column statusColum = new Column("status", 
ScalarType.createVarcharType(30));
+        builder.addColumn(databaseColumn);
+        builder.addColumn(tableNameColumn);
+        builder.addColumn(statusColum);
+        return builder.build();
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java
new file mode 100644
index 0000000000..28007a8e66
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColumnIdFlushDaemon.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.alter.AlterLightSchChangeHelper;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.persist.AlterLightSchemaChangeInfo;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+
+/**
+ * note(tsy): this class is temporary, make table before 1.2 to enable light 
schema change
+ */
+public class ColumnIdFlushDaemon extends MasterDaemon {
+
+    private static final Logger LOG = 
LogManager.getLogger(ColumnIdFlushDaemon.class);
+
+    /**
+     * db name -> (tbl name -> status)
+     */
+    private final Map<String, Map<String, FlushStatus>> resultCollector;
+
+    private final ReadWriteLock rwLock;
+
+    private final BiConsumer<Database, OlapTable> flushFunc;
+
+    public ColumnIdFlushDaemon() {
+        super("colum-id-flusher", TimeUnit.HOURS.toMillis(1));
+        resultCollector = Maps.newHashMap();
+        rwLock = new ReentrantReadWriteLock();
+        if (Config.enable_convert_light_weight_schema_change) {
+            flushFunc = this::doFlush;
+        } else {
+            flushFunc = (db, table) -> record(db.getFullName(), 
table.getName(), FlushStatus.init());
+        }
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        flush();
+    }
+
+    private void flush() {
+        List<Database> dbs = Env.getCurrentEnv().getInternalCatalog().getDbs();
+        for (Database db : dbs) {
+            rwLock.writeLock().lock();
+            try {
+                db.getTables()
+                        .stream()
+                        .filter(table -> table instanceof OlapTable)
+                        .map(table -> (OlapTable) table)
+                        .filter(olapTable -> 
!olapTable.getTableProperty().getUseSchemaLightChange())
+                        .forEach(table -> flushFunc.accept(db, table));
+            } finally {
+                rwLock.writeLock().unlock();
+            }
+            try {
+                // avoid too often to call be
+                sleep(3000);
+            } catch (InterruptedException ignore) {
+                // do nothing
+            }
+        }
+    }
+
+    private void doFlush(Database db, OlapTable table) {
+        record(db.getFullName(), table.getName(), FlushStatus.init());
+        AlterLightSchChangeHelper schChangeHelper = new 
AlterLightSchChangeHelper(db, table);
+        AlterLightSchemaChangeInfo changeInfo;
+        try {
+            changeInfo = schChangeHelper.callForColumnsInfo();
+        } catch (IllegalStateException e) {
+            record(db.getFullName(), table.getName(), 
FlushStatus.failed(e.getMessage()));
+            return;
+        }
+        table.writeLock();
+        try {
+            if (table.getTableProperty().getUseSchemaLightChange()) {
+                removeRecord(db.getFullName(), table.getName());
+                return;
+            }
+            schChangeHelper.updateTableMeta(changeInfo);
+            
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(changeInfo);
+            LOG.info("successfully enable `light_schema_change`, db={}, 
tbl={}",
+                    db.getFullName(), table.getName());
+            removeRecord(db.getFullName(), table.getName());
+        } catch (IllegalStateException e) {
+            record(db.getFullName(), table.getName(), 
FlushStatus.failed(e.getMessage()));
+        } finally {
+            table.writeUnlock();
+        }
+    }
+
+    private void record(String dbName, String tableName, FlushStatus status) {
+        resultCollector.putIfAbsent(dbName, Maps.newHashMap());
+        Map<String, FlushStatus> tableToStatus = resultCollector.get(dbName);
+        tableToStatus.put(tableName, status);
+    }
+
+    private void removeRecord(String dbName, String tableName) {
+        Map<String, FlushStatus> tableToStatus;
+        if (resultCollector.containsKey(dbName)
+                && (tableToStatus = 
resultCollector.get(dbName)).containsKey(tableName)) {
+            tableToStatus.remove(tableName);
+            if (tableToStatus.isEmpty()) {
+                resultCollector.remove(dbName);
+            }
+        }
+    }
+
+    public Map<String, Map<String, FlushStatus>> getResultCollector() {
+        return resultCollector;
+    }
+
+    public void readLock() {
+        rwLock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        rwLock.readLock().unlock();
+    }
+
+    public static class FlushStatus {
+
+        private FlushStatus() {
+            this.success = true;
+            this.msg = "Waiting to be converted";
+        }
+
+        private FlushStatus(String msg) {
+            this.success = false;
+            this.msg = msg;
+        }
+
+        public static FlushStatus init() {
+            return new FlushStatus();
+        }
+
+        public static FlushStatus failed(String reason) {
+            return new FlushStatus(reason);
+        }
+
+        public boolean isSuccess() {
+            return success;
+        }
+
+        public String getMsg() {
+            return msg;
+        }
+
+        private final boolean success;
+
+        private final String msg;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 554712db8d..9fbed8b63d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -333,6 +333,8 @@ public class Env {
     private Daemon timePrinter;
     private Daemon listener;
 
+    private ColumnIdFlushDaemon columnIdFlusher;
+
     private boolean isFirstTimeStartUp = false;
     private boolean isElectable;
     // set to true after finished replay all meta and ready to serve
@@ -674,6 +676,7 @@ public class Env {
         this.hiveTransactionMgr = new HiveTransactionMgr();
         this.binlogManager = new BinlogManager();
         this.binlogGcer = new BinlogGcer();
+        this.columnIdFlusher = new ColumnIdFlushDaemon();
     }
 
     public static void destroyCheckpoint() {
@@ -1503,6 +1506,7 @@ public class Env {
 
         // binlog gcer
         binlogGcer.start();
+        columnIdFlusher.start();
     }
 
     // start threads that should running on all FE
@@ -5505,4 +5509,8 @@ public class Env {
         queryStats.clear(info);
         editLog.logCleanQueryStats(info);
     }
+
+    public ColumnIdFlushDaemon getColumnIdFlusher() {
+        return columnIdFlusher;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 9c3e1b1b31..0ebcb1c7dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -40,6 +40,7 @@ import org.apache.doris.analysis.ShowCollationStmt;
 import org.apache.doris.analysis.ShowColumnHistStmt;
 import org.apache.doris.analysis.ShowColumnStatsStmt;
 import org.apache.doris.analysis.ShowColumnStmt;
+import org.apache.doris.analysis.ShowConvertLSCStmt;
 import org.apache.doris.analysis.ShowCreateCatalogStmt;
 import org.apache.doris.analysis.ShowCreateDbStmt;
 import org.apache.doris.analysis.ShowCreateFunctionStmt;
@@ -110,6 +111,7 @@ import org.apache.doris.backup.Repository;
 import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.blockrule.SqlBlockRule;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ColumnIdFlushDaemon;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.DynamicPartitionProperty;
@@ -236,6 +238,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -427,6 +430,8 @@ public class ShowExecutor {
             handleShowBuildIndexStmt();
         } else if (stmt instanceof ShowAnalyzeTaskStatus) {
             handleShowAnalyzeTaskStatus();
+        } else if (stmt instanceof ShowConvertLSCStmt) {
+            handleShowConvertLSC();
         } else {
             handleEmtpy();
         }
@@ -2828,5 +2833,43 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
     }
 
+
+    private void handleShowConvertLSC() {
+        ShowConvertLSCStmt showStmt = (ShowConvertLSCStmt) stmt;
+        ColumnIdFlushDaemon columnIdFlusher = 
Env.getCurrentEnv().getColumnIdFlusher();
+        columnIdFlusher.readLock();
+        List<List<String>> rows;
+        try {
+            Map<String, Map<String, ColumnIdFlushDaemon.FlushStatus>> 
resultCollector =
+                    columnIdFlusher.getResultCollector();
+            rows = new ArrayList<>();
+            String db = ((ShowConvertLSCStmt) stmt).getDbName();
+            if (db != null) {
+                Map<String, ColumnIdFlushDaemon.FlushStatus> tblNameToStatus = 
resultCollector.get(db);
+                if (tblNameToStatus != null) {
+                    tblNameToStatus.forEach((tblName, status) -> {
+                        List<String> row = new ArrayList<>();
+                        row.add(db);
+                        row.add(tblName);
+                        row.add(status.getMsg());
+                        rows.add(row);
+                    });
+                }
+            } else {
+                resultCollector.forEach((dbName, tblNameToStatus) ->
+                        tblNameToStatus.forEach((tblName, status) -> {
+                            List<String> row = new ArrayList<>();
+                            row.add(dbName);
+                            row.add(tblName);
+                            row.add(status.getMsg());
+                            rows.add(row);
+                        }));
+            }
+        } finally {
+            columnIdFlusher.readUnlock();
+        }
+        resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+    }
+
 }
 
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index cc4c11607b..56f8faae18 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -493,6 +493,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
         keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
         keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED));
+        keywordMap.put("convert_light_schema_change_process", new 
Integer(SqlParserSymbols.KW_CONVERT_LSC));
    }
     
   // map from token id to token description


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to