This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b983cbd02d410e0e737e540403931fdd0a686e3c
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Jan 30 17:12:09 2024 +0800

    [fix](audit) add workload_group to audit log table (#30470)
    
    1. Missing workload_group column in audit table
    2. Extract the definition of internal schema's tables into a new class
    3. Fix bug that audit loader has no authorization to load data to 
audit_table, introduced from #29790
    4. Fix bug that audit_log can not be modified to 3 replica because it is 
partitioned table
---
 .../org/apache/doris/analysis/CreateTableStmt.java |   2 +-
 .../org/apache/doris/catalog/InternalSchema.java   | 113 +++++++++++++++++
 .../doris/catalog/InternalSchemaInitializer.java   | 133 ++++++++-------------
 .../doris/plugin/audit/AuditLoaderPlugin.java      |   1 +
 .../doris/plugin/audit/AuditStreamLoader.java      |   8 +-
 .../apache/doris/service/FrontendServiceImpl.java  |   8 +-
 .../org/apache/doris/system/SystemInfoService.java |  13 ++
 .../doris/transaction/DatabaseTransactionMgr.java  |   4 +-
 .../apache/doris/transaction/TransactionState.java |   3 +
 .../doris/alter/InternalSchemaAlterTest.java       |  74 ++++++++++++
 .../doris/statistics/AnalysisTaskExecutorTest.java |   2 +-
 .../org/apache/doris/statistics/AnalyzeTest.java   |   2 +-
 .../apache/doris/utframe/TestWithFeService.java    |   4 +
 13 files changed, 277 insertions(+), 90 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 17dd5f396c8..ae6958a70c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -98,7 +98,7 @@ public class CreateTableStmt extends DdlStmt {
         engineNames.add("broker");
     }
 
-    // if auto bucket auto bucket enable, rewrite distribution bucket num &&
+    // if auto bucket enable, rewrite distribution bucket num &&
     // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
     private static Map<String, String> 
maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
             Map<String, String> properties) throws AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
new file mode 100644
index 00000000000..7d348761704
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.TypeDef;
+import org.apache.doris.common.UserException;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
+import org.apache.doris.statistics.StatisticConstants;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InternalSchema {
+
+    // Do not use the original schema directly, because it may be modified by 
create table operation.
+    public static final List<ColumnDef> COL_STATS_SCHEMA;
+    public static final List<ColumnDef> HISTO_STATS_SCHEMA;
+    public static final List<ColumnDef> AUDIT_SCHEMA;
+
+    static {
+        // column statistics table
+        COL_STATS_SCHEMA = new ArrayList<>();
+        COL_STATS_SCHEMA.add(new ColumnDef("id", 
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("catalog_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("db_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("tbl_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("idx_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("col_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        COL_STATS_SCHEMA.add(new ColumnDef("part_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("count", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("ndv", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("null_count", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("min", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("max", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("data_size_in_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        COL_STATS_SCHEMA.add(new ColumnDef("update_time", 
TypeDef.create(PrimitiveType.DATETIME)));
+
+        // histogram_statistics table
+        HISTO_STATS_SCHEMA = new ArrayList<>();
+        HISTO_STATS_SCHEMA.add(new ColumnDef("id", 
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("catalog_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("db_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("tbl_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("idx_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("col_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("sample_rate", 
TypeDef.create(PrimitiveType.DOUBLE)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("buckets", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
+        HISTO_STATS_SCHEMA.add(new ColumnDef("update_time", 
TypeDef.create(PrimitiveType.DATETIME)));
+
+        // audit table
+        AUDIT_SCHEMA = new ArrayList<>();
+        AUDIT_SCHEMA.add(new ColumnDef("query_id", TypeDef.createVarchar(48), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("time", 
TypeDef.create(PrimitiveType.DATETIME), true));
+        AUDIT_SCHEMA.add(new ColumnDef("client_ip", 
TypeDef.createVarchar(128), true));
+        AUDIT_SCHEMA.add(new ColumnDef("user", TypeDef.createVarchar(128), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("catalog", TypeDef.createVarchar(128), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("db", TypeDef.createVarchar(128), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("state", TypeDef.createVarchar(128), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("error_code", 
TypeDef.create(PrimitiveType.INT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("error_message", 
TypeDef.create(PrimitiveType.STRING), true));
+        AUDIT_SCHEMA.add(new ColumnDef("query_time", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("scan_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("scan_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("return_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("stmt_id", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("is_query", 
TypeDef.create(PrimitiveType.TINYINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", 
TypeDef.createVarchar(128), true));
+        AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), 
true));
+        AUDIT_SCHEMA.add(new ColumnDef("sql_digest", 
TypeDef.createVarchar(128), true));
+        AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
+        AUDIT_SCHEMA.add(new ColumnDef("workload_group", 
TypeDef.create(PrimitiveType.STRING), true));
+        AUDIT_SCHEMA.add(new ColumnDef("stmt", 
TypeDef.create(PrimitiveType.STRING), true));
+    }
+
+    // Get copied schema for statistic table
+    // Do not use the original schema directly, because it may be modified by 
create table operation.
+    public static List<ColumnDef> getCopiedSchema(String tblName) throws 
UserException {
+        List<ColumnDef> schema;
+        if (tblName.equals(StatisticConstants.STATISTIC_TBL_NAME)) {
+            schema = COL_STATS_SCHEMA;
+        } else if (tblName.equals(StatisticConstants.HISTOGRAM_TBL_NAME)) {
+            schema = HISTO_STATS_SCHEMA;
+        } else if (tblName.equals(AuditLoaderPlugin.AUDIT_LOG_TABLE)) {
+            schema = AUDIT_SCHEMA;
+        } else {
+            throw new UserException("Unknown internal table name: " + tblName);
+        }
+        List<ColumnDef> copiedSchema = Lists.newArrayList();
+        for (ColumnDef columnDef : schema) {
+            copiedSchema.add(new ColumnDef(columnDef.getName(), 
columnDef.getTypeDef(), columnDef.isAllowNull()));
+        }
+        return copiedSchema;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index d53520b133c..169e2fac80b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -17,17 +17,18 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.analysis.ColumnDef;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.ModifyPartitionClause;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.RangePartitionDesc;
 import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
@@ -56,33 +57,6 @@ public class InternalSchemaInitializer extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(InternalSchemaInitializer.class);
 
-    public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
-
-    static {
-        AUDIT_TABLE_COLUMNS = new ArrayList<>();
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", 
TypeDef.createVarchar(48), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", 
TypeDef.create(PrimitiveType.DATETIME), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", 
TypeDef.create(PrimitiveType.INT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", 
TypeDef.create(PrimitiveType.STRING), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", 
TypeDef.create(PrimitiveType.TINYINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", 
TypeDef.createVarchar(128), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", 
TypeDef.create(PrimitiveType.STRING), true));
-    }
-
     public void run() {
         if (!FeConstants.enableInternalSchemaDb) {
             return;
@@ -97,7 +71,7 @@ public class InternalSchemaInitializer extends Thread {
                 }
                 Thread.currentThread()
                         .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 
1000L);
-                createDB();
+                createDb();
                 createTbl();
             } catch (Throwable e) {
                 LOG.warn("Statistics storage initiated failed, will try again 
later", e);
@@ -116,29 +90,51 @@ public class InternalSchemaInitializer extends Thread {
         modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
     }
 
-    public void modifyTblReplicaCount(Database database, String tblName) {
+    @VisibleForTesting
+    public static void modifyTblReplicaCount(Database database, String 
tblName) {
         if (!(Config.min_replication_num_per_tablet < 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
                 && Config.max_replication_num_per_tablet >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
             return;
         }
         while (true) {
-            if (Env.getCurrentSystemInfo().aliveBECount() >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+            int backendNum = 
Env.getCurrentSystemInfo().getBackendNumFromDiffHosts(true);
+            if (FeConstants.runningUnitTest) {
+                backendNum = 
Env.getCurrentSystemInfo().getAllBackendIds().size();
+            }
+            if (backendNum >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
                 try {
-                    Map<String, String> props = new HashMap<>();
-                    
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, 
"tag.location.default: "
-                            + 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
-                    TableIf colStatsTbl = 
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
+                    OlapTable tbl = (OlapTable) 
StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
                             StatisticConstants.DB_NAME, tblName);
-                    OlapTable olapTable = (OlapTable) colStatsTbl;
-                    if 
(olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
-                            >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
-                        return;
-                    }
-                    colStatsTbl.writeLock();
+                    tbl.writeLock();
                     try {
-                        
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) 
colStatsTbl, props);
+                        if 
(tbl.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
+                                >= 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
+                            return;
+                        }
+                        if (!tbl.isPartitionedTable()) {
+                            Map<String, String> props = new HashMap<>();
+                            
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, 
"tag.location.default: "
+                                    + 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+                            
Env.getCurrentEnv().modifyTableReplicaAllocation(database, tbl, props);
+                        } else {
+                            TableName tableName = new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
+                                    StatisticConstants.DB_NAME, tbl.getName());
+                            // 1. modify table's default replica num
+                            Map<String, String> props = new HashMap<>();
+                            props.put("default." + 
PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
+                                    "" + 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+                            
Env.getCurrentEnv().modifyTableDefaultReplicaAllocation(database, tbl, props);
+                            // 2. modify each partition's replica num
+                            List<AlterClause> clauses = Lists.newArrayList();
+                            props.clear();
+                            
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
+                                    "" + 
StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
+                            
clauses.add(ModifyPartitionClause.createStarClause(props, false));
+                            AlterTableStmt alter = new 
AlterTableStmt(tableName, clauses);
+                            Env.getCurrentEnv().alterTable(alter);
+                        }
                     } finally {
-                        colStatsTbl.writeUnlock();
+                        tbl.writeUnlock();
                     }
                     break;
                 } catch (Throwable t) {
@@ -153,7 +149,8 @@ public class InternalSchemaInitializer extends Thread {
         }
     }
 
-    private void createTbl() throws UserException {
+    @VisibleForTesting
+    public static void createTbl() throws UserException {
         // statistics
         
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
         
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
@@ -162,7 +159,7 @@ public class InternalSchemaInitializer extends Thread {
     }
 
     @VisibleForTesting
-    public static void createDB() {
+    public static void createDb() {
         CreateDbStmt createDbStmt = new CreateDbStmt(true, 
FeConstants.INTERNAL_DB_NAME,
                 null);
         try {
@@ -173,27 +170,9 @@ public class InternalSchemaInitializer extends Thread {
         }
     }
 
-    @VisibleForTesting
-    public CreateTableStmt buildStatisticsTblStmt() throws UserException {
+    private static CreateTableStmt buildStatisticsTblStmt() throws 
UserException {
         TableName tableName = new TableName("",
                 FeConstants.INTERNAL_DB_NAME, 
StatisticConstants.STATISTIC_TBL_NAME);
-        List<ColumnDef> columnDefs = new ArrayList<>();
-        columnDefs.add(new ColumnDef("id", 
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
-        columnDefs.add(new ColumnDef("catalog_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("db_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("tbl_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("idx_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("col_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        ColumnDef partId = new ColumnDef("part_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN));
-        partId.setAllowNull(true);
-        columnDefs.add(partId);
-        columnDefs.add(new ColumnDef("count", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        columnDefs.add(new ColumnDef("ndv", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        columnDefs.add(new ColumnDef("null_count", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        columnDefs.add(new ColumnDef("min", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
-        columnDefs.add(new ColumnDef("max", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
-        columnDefs.add(new ColumnDef("data_size_in_bytes", 
TypeDef.create(PrimitiveType.BIGINT), true));
-        columnDefs.add(new ColumnDef("update_time", 
TypeDef.create(PrimitiveType.DATETIME)));
         String engineName = "olap";
         ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
                 "db_id", "tbl_id", "idx_id", "col_id", "part_id");
@@ -207,26 +186,16 @@ public class InternalSchemaInitializer extends Thread {
             }
         };
         CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
-                tableName, columnDefs, engineName, keysDesc, null, 
distributionDesc,
+                tableName, 
InternalSchema.getCopiedSchema(StatisticConstants.STATISTIC_TBL_NAME),
+                engineName, keysDesc, null, distributionDesc,
                 properties, null, "Doris internal statistics table, DO NOT 
MODIFY IT", null);
         StatisticsUtil.analyze(createTableStmt);
         return createTableStmt;
     }
 
-    @VisibleForTesting
-    public CreateTableStmt buildHistogramTblStmt() throws UserException {
+    private static CreateTableStmt buildHistogramTblStmt() throws 
UserException {
         TableName tableName = new TableName("",
                 FeConstants.INTERNAL_DB_NAME, 
StatisticConstants.HISTOGRAM_TBL_NAME);
-        List<ColumnDef> columnDefs = new ArrayList<>();
-        columnDefs.add(new ColumnDef("id", 
TypeDef.createVarchar(StatisticConstants.ID_LEN)));
-        columnDefs.add(new ColumnDef("catalog_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("db_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("tbl_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("idx_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("col_id", 
TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
-        columnDefs.add(new ColumnDef("sample_rate", 
TypeDef.create(PrimitiveType.DOUBLE)));
-        columnDefs.add(new ColumnDef("buckets", 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
-        columnDefs.add(new ColumnDef("update_time", 
TypeDef.create(PrimitiveType.DATETIME)));
         String engineName = "olap";
         ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
                 "db_id", "tbl_id", "idx_id", "col_id");
@@ -240,13 +209,14 @@ public class InternalSchemaInitializer extends Thread {
             }
         };
         CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
-                tableName, columnDefs, engineName, keysDesc, null, 
distributionDesc,
+                tableName, 
InternalSchema.getCopiedSchema(StatisticConstants.HISTOGRAM_TBL_NAME),
+                engineName, keysDesc, null, distributionDesc,
                 properties, null, "Doris internal statistics table, DO NOT 
MODIFY IT", null);
         StatisticsUtil.analyze(createTableStmt);
         return createTableStmt;
     }
 
-    private CreateTableStmt buildAuditTblStmt() throws UserException {
+    private static CreateTableStmt buildAuditTblStmt() throws UserException {
         TableName tableName = new TableName("",
                 FeConstants.INTERNAL_DB_NAME, 
AuditLoaderPlugin.AUDIT_LOG_TABLE);
 
@@ -271,7 +241,8 @@ public class InternalSchemaInitializer extends Thread {
             }
         };
         CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
-                tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, 
partitionDesc, distributionDesc,
+                tableName, 
InternalSchema.getCopiedSchema(AuditLoaderPlugin.AUDIT_LOG_TABLE),
+                engineName, keysDesc, partitionDesc, distributionDesc,
                 properties, null, "Doris internal audit table, DO NOT MODIFY 
IT", null);
         StatisticsUtil.analyze(createTableStmt);
         return createTableStmt;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 533f50f062d..148fc460d5a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -161,6 +161,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         logBuffer.append(event.sqlHash).append("\t");
         logBuffer.append(event.sqlDigest).append("\t");
         logBuffer.append(event.peakMemoryBytes).append("\t");
+        logBuffer.append(event.workloadGroup).append("\t");
         // trim the query to avoid too long
         // use `getBytes().length` to get real byte length
         String stmt = truncateByBytes(event.stmt).replace("\n", " ")
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index 07175478275..72b046f015a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.plugin.audit;
 
-import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.catalog.InternalSchema;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 
@@ -48,7 +48,7 @@ public class AuditStreamLoader {
         this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE;
         this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, 
auditLogTbl);
         // currently, FE identity is FE's IP, so we replace the "." in IP to 
make it suitable for label
-        this.feIdentity = hostPort.replaceAll("\\.", "_");
+        this.feIdentity = hostPort.replaceAll("\\.", "_").replaceAll(":", "_");
     }
 
     private HttpURLConnection getConnection(String urlStr, String label, 
String clusterToken) throws IOException {
@@ -63,7 +63,7 @@ public class AuditStreamLoader {
         conn.addRequestProperty("label", label);
         conn.addRequestProperty("max_filter_ratio", "1.0");
         conn.addRequestProperty("columns",
-                InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c 
-> c.getName()).collect(
+                InternalSchema.AUDIT_SCHEMA.stream().map(c -> 
c.getName()).collect(
                         Collectors.joining(",")));
         conn.setDoOutput(true);
         conn.setDoInput(true);
@@ -78,7 +78,7 @@ public class AuditStreamLoader {
         sb.append("-H \"").append("Content-Type\":").append("\"text/plain; 
charset=UTF-8\" \\\n  ");
         sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n  
");
         sb.append("-H \"").append("columns\":")
-                .append("\"" + 
InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> 
c.getName()).collect(
+                .append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c -> 
c.getName()).collect(
                         Collectors.joining(",")) + "\" \\\n  ");
         sb.append("\"").append(conn.getURL()).append("\"");
         return sb.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 85753a7330a..86e88bb2b20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1090,6 +1090,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
                     request.getTbl(),
                     request.getUserIp(), PrivPredicate.LOAD);
+        } else {
+            checkToken(request.getToken());
         }
 
         // check label
@@ -1111,9 +1113,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, 
TableType.OLAP);
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
+        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
clientIp);
+        if (request.isSetToken()) {
+            txnCoord.isFromInternal = true;
+        }
         long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                 db.getId(), Lists.newArrayList(table.getId()), 
request.getLabel(), request.getRequestId(),
-                new TxnCoordinator(TxnSourceType.BE, clientIp),
+                txnCoord,
                 TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 
timeoutSecond);
         TLoadTxnBeginResult result = new TLoadTxnBeginResult();
         result.setTxnId(txnId).setDbId(db.getId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 9c1e196923f..feb3ac81470 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -416,6 +416,19 @@ public class SystemInfoService {
         return idToBackendRef.values().stream().filter(backend -> 
backend.isComputeNode()).collect(Collectors.toList());
     }
 
+    // return num of backends that from different hosts
+    public int getBackendNumFromDiffHosts(boolean aliveOnly) {
+        Set<String> hosts = Sets.newHashSet();
+        ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
+        for (Backend backend : idToBackend.values()) {
+            if (aliveOnly && !backend.isAlive()) {
+                continue;
+            }
+            hosts.add(backend.getHost());
+        }
+        return hosts.size();
+    }
+
     class BeIdComparator implements Comparator<Backend> {
         public int compare(Backend a, Backend b) {
             return (int) (a.getId() - b.getId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 43233002935..e780bc02472 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -321,7 +321,9 @@ public class DatabaseTransactionMgr {
             throws DuplicatedRequestException, LabelAlreadyUsedException, 
BeginTransactionException,
             AnalysisException, QuotaExceedException, MetaNotFoundException {
         Database db = env.getInternalCatalog().getDbOrMetaException(dbId);
-        InternalDatabaseUtil.checkDatabase(db.getFullName(), 
ConnectContext.get());
+        if (!coordinator.isFromInternal) {
+            InternalDatabaseUtil.checkDatabase(db.getFullName(), 
ConnectContext.get());
+        }
         checkDatabaseDataQuota();
         Preconditions.checkNotNull(coordinator);
         Preconditions.checkNotNull(label);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index dbe63a0c79d..b7401416676 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -164,6 +164,9 @@ public class TransactionState implements Writable {
         public TxnSourceType sourceType;
         @SerializedName(value = "ip")
         public String ip;
+        // True if this txn if created by system(such as writing data to audit 
table)
+        @SerializedName(value = "ii")
+        public boolean isFromInternal = false;
 
         public TxnCoordinator() {
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
new file mode 100644
index 00000000000..2898e3a1c35
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.alter;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.InternalSchemaInitializer;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.plugin.audit.AuditLoaderPlugin;
+import org.apache.doris.statistics.StatisticConstants;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class InternalSchemaAlterTest extends TestWithFeService {
+
+    @Override
+    protected int backendNum() {
+        return 3;
+    }
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        InternalSchemaInitializer.createDb();
+        InternalSchemaInitializer.createTbl();
+        Config.allow_replica_on_same_host = true;
+        FeConstants.runningUnitTest = true;
+    }
+
+    @Test
+    public void testModifyTblReplicaCount() throws AnalysisException {
+        Database db = Env.getCurrentEnv().getCatalogMgr()
+                
.getInternalCatalog().getDbNullable(FeConstants.INTERNAL_DB_NAME);
+        InternalSchemaInitializer.modifyTblReplicaCount(db, 
StatisticConstants.STATISTIC_TBL_NAME);
+        InternalSchemaInitializer.modifyTblReplicaCount(db, 
StatisticConstants.HISTOGRAM_TBL_NAME);
+        InternalSchemaInitializer.modifyTblReplicaCount(db, 
AuditLoaderPlugin.AUDIT_LOG_TABLE);
+
+        checkReplicationNum(db, StatisticConstants.STATISTIC_TBL_NAME);
+        checkReplicationNum(db, StatisticConstants.HISTOGRAM_TBL_NAME);
+        checkReplicationNum(db, AuditLoaderPlugin.AUDIT_LOG_TABLE);
+    }
+
+    private void checkReplicationNum(Database db, String tblName) throws 
AnalysisException {
+        OlapTable olapTable = db.getOlapTableOrAnalysisException(tblName);
+        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        Assertions.assertEquals((short) 3, 
olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum(),
+                tblName);
+        for (Partition partition : olapTable.getPartitions()) {
+            Assertions.assertEquals((short) 3,
+                    
partitionInfo.getReplicaAllocation(partition.getId()).getTotalReplicaNum());
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 1e187d74671..b17ba3e68db 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -54,7 +54,7 @@ public class AnalysisTaskExecutorTest extends 
TestWithFeService {
     @Override
     protected void runBeforeAll() throws Exception {
         try {
-            InternalSchemaInitializer.createDB();
+            InternalSchemaInitializer.createDb();
             createDatabase("analysis_job_test");
             connectContext.setDatabase("analysis_job_test");
             createTable("CREATE TABLE t1 (col1 int not null, col2 int not 
null, col3 int not null)\n"
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
index 85fa42071e9..483cd3c0326 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
@@ -55,7 +55,7 @@ public class AnalyzeTest extends TestWithFeService {
     @Override
     protected void runBeforeAll() throws Exception {
         try {
-            InternalSchemaInitializer.createDB();
+            InternalSchemaInitializer.createDb();
             createDatabase("analysis_job_test");
             connectContext.setDatabase("analysis_job_test");
             createTable("CREATE TABLE t1 (col1 int not null, col2 int not 
null, col3 int not null)\n"
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 1f57040d305..65070dd504b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -188,6 +188,10 @@ public abstract class TestWithFeService {
         return 1;
     }
 
+    protected boolean needDiffHost() {
+        return false;
+    }
+
     // Help to create a mocked ConnectContext.
     public static ConnectContext createDefaultCtx() throws IOException {
         return createCtx(UserIdentity.ROOT, "127.0.0.1");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to