This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b983cbd02d410e0e737e540403931fdd0a686e3c Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Jan 30 17:12:09 2024 +0800 [fix](audit) add workload_group to audit log table (#30470) 1. Missing workload_group column in audit table 2. Extract the definition of internal schema's tables into a new class 3. Fix bug that audit loader has no authorization to load data to audit_table, introduced from #29790 4. Fix bug that audit_log can not be modified to 3 replica because it is partitioned table --- .../org/apache/doris/analysis/CreateTableStmt.java | 2 +- .../org/apache/doris/catalog/InternalSchema.java | 113 +++++++++++++++++ .../doris/catalog/InternalSchemaInitializer.java | 133 ++++++++------------- .../doris/plugin/audit/AuditLoaderPlugin.java | 1 + .../doris/plugin/audit/AuditStreamLoader.java | 8 +- .../apache/doris/service/FrontendServiceImpl.java | 8 +- .../org/apache/doris/system/SystemInfoService.java | 13 ++ .../doris/transaction/DatabaseTransactionMgr.java | 4 +- .../apache/doris/transaction/TransactionState.java | 3 + .../doris/alter/InternalSchemaAlterTest.java | 74 ++++++++++++ .../doris/statistics/AnalysisTaskExecutorTest.java | 2 +- .../org/apache/doris/statistics/AnalyzeTest.java | 2 +- .../apache/doris/utframe/TestWithFeService.java | 4 + 13 files changed, 277 insertions(+), 90 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 17dd5f396c8..ae6958a70c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -98,7 +98,7 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("broker"); } - // if auto bucket auto bucket enable, rewrite distribution bucket num && + // if auto bucket enable, rewrite distribution bucket num && // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true" private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc, Map<String, String> properties) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java new file mode 100644 index 00000000000..7d348761704 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.TypeDef; +import org.apache.doris.common.UserException; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; +import org.apache.doris.statistics.StatisticConstants; + +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; + +public class InternalSchema { + + // Do not use the original schema directly, because it may be modified by create table operation. + public static final List<ColumnDef> COL_STATS_SCHEMA; + public static final List<ColumnDef> HISTO_STATS_SCHEMA; + public static final List<ColumnDef> AUDIT_SCHEMA; + + static { + // column statistics table + COL_STATS_SCHEMA = new ArrayList<>(); + COL_STATS_SCHEMA.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + COL_STATS_SCHEMA.add(new ColumnDef("part_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN), true)); + COL_STATS_SCHEMA.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT), true)); + COL_STATS_SCHEMA.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT), true)); + COL_STATS_SCHEMA.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true)); + COL_STATS_SCHEMA.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); + COL_STATS_SCHEMA.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); + COL_STATS_SCHEMA.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + COL_STATS_SCHEMA.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME))); + + // histogram_statistics table + HISTO_STATS_SCHEMA = new ArrayList<>(); + HISTO_STATS_SCHEMA.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); + HISTO_STATS_SCHEMA.add(new ColumnDef("sample_rate", TypeDef.create(PrimitiveType.DOUBLE))); + HISTO_STATS_SCHEMA.add(new ColumnDef("buckets", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))); + HISTO_STATS_SCHEMA.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME))); + + // audit table + AUDIT_SCHEMA = new ArrayList<>(); + AUDIT_SCHEMA.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true)); + AUDIT_SCHEMA.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true)); + AUDIT_SCHEMA.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("user", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("db", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("state", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true)); + AUDIT_SCHEMA.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true)); + AUDIT_SCHEMA.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true)); + AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA.add(new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), true)); + AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true)); + } + + // Get copied schema for statistic table + // Do not use the original schema directly, because it may be modified by create table operation. + public static List<ColumnDef> getCopiedSchema(String tblName) throws UserException { + List<ColumnDef> schema; + if (tblName.equals(StatisticConstants.STATISTIC_TBL_NAME)) { + schema = COL_STATS_SCHEMA; + } else if (tblName.equals(StatisticConstants.HISTOGRAM_TBL_NAME)) { + schema = HISTO_STATS_SCHEMA; + } else if (tblName.equals(AuditLoaderPlugin.AUDIT_LOG_TABLE)) { + schema = AUDIT_SCHEMA; + } else { + throw new UserException("Unknown internal table name: " + tblName); + } + List<ColumnDef> copiedSchema = Lists.newArrayList(); + for (ColumnDef columnDef : schema) { + copiedSchema.add(new ColumnDef(columnDef.getName(), columnDef.getTypeDef(), columnDef.isAllowNull())); + } + return copiedSchema; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index d53520b133c..169e2fac80b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -17,17 +17,18 @@ package org.apache.doris.catalog; -import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.AlterClause; +import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.ModifyPartitionClause; import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.TableName; -import org.apache.doris.analysis.TypeDef; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -56,33 +57,6 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); - public static final List<ColumnDef> AUDIT_TABLE_COLUMNS; - - static { - AUDIT_TABLE_COLUMNS = new ArrayList<>(); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); - AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true)); - } - public void run() { if (!FeConstants.enableInternalSchemaDb) { return; @@ -97,7 +71,7 @@ public class InternalSchemaInitializer extends Thread { } Thread.currentThread() .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); - createDB(); + createDb(); createTbl(); } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); @@ -116,29 +90,51 @@ public class InternalSchemaInitializer extends Thread { modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE); } - public void modifyTblReplicaCount(Database database, String tblName) { + @VisibleForTesting + public static void modifyTblReplicaCount(Database database, String tblName) { if (!(Config.min_replication_num_per_tablet < StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM && Config.max_replication_num_per_tablet >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) { return; } while (true) { - if (Env.getCurrentSystemInfo().aliveBECount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { + int backendNum = Env.getCurrentSystemInfo().getBackendNumFromDiffHosts(true); + if (FeConstants.runningUnitTest) { + backendNum = Env.getCurrentSystemInfo().getAllBackendIds().size(); + } + if (backendNum >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { try { - Map<String, String> props = new HashMap<>(); - props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: " - + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM); - TableIf colStatsTbl = StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + OlapTable tbl = (OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME, StatisticConstants.DB_NAME, tblName); - OlapTable olapTable = (OlapTable) colStatsTbl; - if (olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum() - >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { - return; - } - colStatsTbl.writeLock(); + tbl.writeLock(); try { - Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props); + if (tbl.getTableProperty().getReplicaAllocation().getTotalReplicaNum() + >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { + return; + } + if (!tbl.isPartitionedTable()) { + Map<String, String> props = new HashMap<>(); + props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: " + + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM); + Env.getCurrentEnv().modifyTableReplicaAllocation(database, tbl, props); + } else { + TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, + StatisticConstants.DB_NAME, tbl.getName()); + // 1. modify table's default replica num + Map<String, String> props = new HashMap<>(); + props.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, + "" + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM); + Env.getCurrentEnv().modifyTableDefaultReplicaAllocation(database, tbl, props); + // 2. modify each partition's replica num + List<AlterClause> clauses = Lists.newArrayList(); + props.clear(); + props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, + "" + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM); + clauses.add(ModifyPartitionClause.createStarClause(props, false)); + AlterTableStmt alter = new AlterTableStmt(tableName, clauses); + Env.getCurrentEnv().alterTable(alter); + } } finally { - colStatsTbl.writeUnlock(); + tbl.writeUnlock(); } break; } catch (Throwable t) { @@ -153,7 +149,8 @@ public class InternalSchemaInitializer extends Thread { } } - private void createTbl() throws UserException { + @VisibleForTesting + public static void createTbl() throws UserException { // statistics Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt()); Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt()); @@ -162,7 +159,7 @@ public class InternalSchemaInitializer extends Thread { } @VisibleForTesting - public static void createDB() { + public static void createDb() { CreateDbStmt createDbStmt = new CreateDbStmt(true, FeConstants.INTERNAL_DB_NAME, null); try { @@ -173,27 +170,9 @@ public class InternalSchemaInitializer extends Thread { } } - @VisibleForTesting - public CreateTableStmt buildStatisticsTblStmt() throws UserException { + private static CreateTableStmt buildStatisticsTblStmt() throws UserException { TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, StatisticConstants.STATISTIC_TBL_NAME); - List<ColumnDef> columnDefs = new ArrayList<>(); - columnDefs.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN))); - columnDefs.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - ColumnDef partId = new ColumnDef("part_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)); - partId.setAllowNull(true); - columnDefs.add(partId); - columnDefs.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT), true)); - columnDefs.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT), true)); - columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true)); - columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); - columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); - columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); - columnDefs.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME))); String engineName = "olap"; ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id", "part_id"); @@ -207,26 +186,16 @@ public class InternalSchemaInitializer extends Thread { } }; CreateTableStmt createTableStmt = new CreateTableStmt(true, false, - tableName, columnDefs, engineName, keysDesc, null, distributionDesc, + tableName, InternalSchema.getCopiedSchema(StatisticConstants.STATISTIC_TBL_NAME), + engineName, keysDesc, null, distributionDesc, properties, null, "Doris internal statistics table, DO NOT MODIFY IT", null); StatisticsUtil.analyze(createTableStmt); return createTableStmt; } - @VisibleForTesting - public CreateTableStmt buildHistogramTblStmt() throws UserException { + private static CreateTableStmt buildHistogramTblStmt() throws UserException { TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, StatisticConstants.HISTOGRAM_TBL_NAME); - List<ColumnDef> columnDefs = new ArrayList<>(); - columnDefs.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN))); - columnDefs.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN))); - columnDefs.add(new ColumnDef("sample_rate", TypeDef.create(PrimitiveType.DOUBLE))); - columnDefs.add(new ColumnDef("buckets", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))); - columnDefs.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME))); String engineName = "olap"; ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", "idx_id", "col_id"); @@ -240,13 +209,14 @@ public class InternalSchemaInitializer extends Thread { } }; CreateTableStmt createTableStmt = new CreateTableStmt(true, false, - tableName, columnDefs, engineName, keysDesc, null, distributionDesc, + tableName, InternalSchema.getCopiedSchema(StatisticConstants.HISTOGRAM_TBL_NAME), + engineName, keysDesc, null, distributionDesc, properties, null, "Doris internal statistics table, DO NOT MODIFY IT", null); StatisticsUtil.analyze(createTableStmt); return createTableStmt; } - private CreateTableStmt buildAuditTblStmt() throws UserException { + private static CreateTableStmt buildAuditTblStmt() throws UserException { TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE); @@ -271,7 +241,8 @@ public class InternalSchemaInitializer extends Thread { } }; CreateTableStmt createTableStmt = new CreateTableStmt(true, false, - tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc, + tableName, InternalSchema.getCopiedSchema(AuditLoaderPlugin.AUDIT_LOG_TABLE), + engineName, keysDesc, partitionDesc, distributionDesc, properties, null, "Doris internal audit table, DO NOT MODIFY IT", null); StatisticsUtil.analyze(createTableStmt); return createTableStmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 533f50f062d..148fc460d5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -161,6 +161,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { logBuffer.append(event.sqlHash).append("\t"); logBuffer.append(event.sqlDigest).append("\t"); logBuffer.append(event.peakMemoryBytes).append("\t"); + logBuffer.append(event.workloadGroup).append("\t"); // trim the query to avoid too long // use `getBytes().length` to get real byte length String stmt = truncateByBytes(event.stmt).replace("\n", " ") diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java index 07175478275..72b046f015a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -17,7 +17,7 @@ package org.apache.doris.plugin.audit; -import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.catalog.InternalSchema; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -48,7 +48,7 @@ public class AuditStreamLoader { this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE; this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl); // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label - this.feIdentity = hostPort.replaceAll("\\.", "_"); + this.feIdentity = hostPort.replaceAll("\\.", "_").replaceAll(":", "_"); } private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException { @@ -63,7 +63,7 @@ public class AuditStreamLoader { conn.addRequestProperty("label", label); conn.addRequestProperty("max_filter_ratio", "1.0"); conn.addRequestProperty("columns", - InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect( Collectors.joining(","))); conn.setDoOutput(true); conn.setDoInput(true); @@ -78,7 +78,7 @@ public class AuditStreamLoader { sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n "); sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n "); sb.append("-H \"").append("columns\":") - .append("\"" + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + .append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect( Collectors.joining(",")) + "\" \\\n "); sb.append("\"").append(conn.getURL()).append("\""); return sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 85753a7330a..86e88bb2b20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1090,6 +1090,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + } else { + checkToken(request.getToken()); } // check label @@ -1111,9 +1113,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp); + if (request.isSetToken()) { + txnCoord.isFromInternal = true; + } long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), + txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); TLoadTxnBeginResult result = new TLoadTxnBeginResult(); result.setTxnId(txnId).setDbId(db.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 9c1e196923f..feb3ac81470 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -416,6 +416,19 @@ public class SystemInfoService { return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList()); } + // return num of backends that from different hosts + public int getBackendNumFromDiffHosts(boolean aliveOnly) { + Set<String> hosts = Sets.newHashSet(); + ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + for (Backend backend : idToBackend.values()) { + if (aliveOnly && !backend.isAlive()) { + continue; + } + hosts.add(backend.getHost()); + } + return hosts.size(); + } + class BeIdComparator implements Comparator<Backend> { public int compare(Backend a, Backend b) { return (int) (a.getId() - b.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 43233002935..e780bc02472 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -321,7 +321,9 @@ public class DatabaseTransactionMgr { throws DuplicatedRequestException, LabelAlreadyUsedException, BeginTransactionException, AnalysisException, QuotaExceedException, MetaNotFoundException { Database db = env.getInternalCatalog().getDbOrMetaException(dbId); - InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get()); + if (!coordinator.isFromInternal) { + InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get()); + } checkDatabaseDataQuota(); Preconditions.checkNotNull(coordinator); Preconditions.checkNotNull(label); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index dbe63a0c79d..b7401416676 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -164,6 +164,9 @@ public class TransactionState implements Writable { public TxnSourceType sourceType; @SerializedName(value = "ip") public String ip; + // True if this txn if created by system(such as writing data to audit table) + @SerializedName(value = "ii") + public boolean isFromInternal = false; public TxnCoordinator() { } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java new file mode 100644 index 00000000000..2898e3a1c35 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.alter; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; +import org.apache.doris.statistics.StatisticConstants; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class InternalSchemaAlterTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 3; + } + + @Override + protected void runBeforeAll() throws Exception { + InternalSchemaInitializer.createDb(); + InternalSchemaInitializer.createTbl(); + Config.allow_replica_on_same_host = true; + FeConstants.runningUnitTest = true; + } + + @Test + public void testModifyTblReplicaCount() throws AnalysisException { + Database db = Env.getCurrentEnv().getCatalogMgr() + .getInternalCatalog().getDbNullable(FeConstants.INTERNAL_DB_NAME); + InternalSchemaInitializer.modifyTblReplicaCount(db, StatisticConstants.STATISTIC_TBL_NAME); + InternalSchemaInitializer.modifyTblReplicaCount(db, StatisticConstants.HISTOGRAM_TBL_NAME); + InternalSchemaInitializer.modifyTblReplicaCount(db, AuditLoaderPlugin.AUDIT_LOG_TABLE); + + checkReplicationNum(db, StatisticConstants.STATISTIC_TBL_NAME); + checkReplicationNum(db, StatisticConstants.HISTOGRAM_TBL_NAME); + checkReplicationNum(db, AuditLoaderPlugin.AUDIT_LOG_TABLE); + } + + private void checkReplicationNum(Database db, String tblName) throws AnalysisException { + OlapTable olapTable = db.getOlapTableOrAnalysisException(tblName); + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Assertions.assertEquals((short) 3, olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum(), + tblName); + for (Partition partition : olapTable.getPartitions()) { + Assertions.assertEquals((short) 3, + partitionInfo.getReplicaAllocation(partition.getId()).getTotalReplicaNum()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 1e187d74671..b17ba3e68db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -54,7 +54,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { try { - InternalSchemaInitializer.createDB(); + InternalSchemaInitializer.createDb(); createDatabase("analysis_job_test"); connectContext.setDatabase("analysis_job_test"); createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java index 85fa42071e9..483cd3c0326 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java @@ -55,7 +55,7 @@ public class AnalyzeTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { try { - InternalSchemaInitializer.createDB(); + InternalSchemaInitializer.createDb(); createDatabase("analysis_job_test"); connectContext.setDatabase("analysis_job_test"); createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 1f57040d305..65070dd504b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -188,6 +188,10 @@ public abstract class TestWithFeService { return 1; } + protected boolean needDiffHost() { + return false; + } + // Help to create a mocked ConnectContext. public static ConnectContext createDefaultCtx() throws IOException { return createCtx(UserIdentity.ROOT, "127.0.0.1"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org