This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 77e55bc7496 [feature][insert]Adapt the create table statement to the nereids sql (#32458) 77e55bc7496 is described below commit 77e55bc74966acf7b4ba8e62c042124ca5a2cbb4 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Wed Mar 20 20:47:18 2024 +0800 [feature][insert]Adapt the create table statement to the nereids sql (#32458) issue: #31442 1. adapt create table statement from doris to hive 2. fix insert overwrite for table sink > The doris create hive table statement: ``` mysql> CREATE TABLE buck2( -> id int COMMENT 'col1', -> name string COMMENT 'col2', -> dt string COMMENT 'part1', -> dtm string COMMENT 'part2' -> ) ENGINE=hive -> COMMENT "create tbl" -> PARTITION BY LIST (dt, dtm) () -> DISTRIBUTED BY HASH (id) BUCKETS 16 -> PROPERTIES( -> "file_format" = "orc" -> ); ``` > generated hive create table statement: ``` CREATE TABLE `buck2`( `id` int COMMENT 'col1', `name` string COMMENT 'col2') PARTITIONED BY ( `dt` string, `dtm` string) CLUSTERED BY ( id) INTO 16 BUCKETS ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://HDFS8000871/usr/hive/warehouse/jz3.db/buck2' TBLPROPERTIES ( 'transient_lastDdlTime'='1710840747', 'doris.file_format'='orc') ``` --- .../main/java/org/apache/doris/common/Config.java | 5 + .../datasource/hive/HiveMetaStoreClientHelper.java | 16 ++ .../doris/datasource/hive/HiveMetadataOps.java | 88 ++++++++--- .../doris/datasource/hive/HiveTableMetadata.java | 59 ++++++-- .../datasource/hive/ThriftHMSCachedClient.java | 40 +++-- .../doris/insertoverwrite/InsertOverwriteUtil.java | 45 +++--- .../commands/insert/HiveInsertCommandContext.java | 2 +- .../insert/InsertOverwriteTableCommand.java | 92 +++++++----- .../trees/plans/physical/PhysicalTableSink.java | 3 + .../doris/datasource/hive/HiveMetadataOpsTest.java | 162 +++++++++++++++++++++ .../doris/datasource/hive/HmsCommitTest.java | 5 +- 11 files changed, 409 insertions(+), 108 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index daa9c8b1d35..9783c30ad91 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2216,6 +2216,11 @@ public class Config extends ConfigBase { "Enable external table DDL"}) public static boolean enable_external_ddl = false; + @ConfField(mutable = true, masterOnly = true, description = { + "启用Hive分桶表", + "Enable external hive bucket table"}) + public static boolean enable_create_hive_bucket_table = false; + @ConfField(mutable = true, masterOnly = true, description = { "Hive创建外部表默认指定的文件格式", "Default hive file format for creating table."}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 23c83a11146..bbb6129e2c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -606,6 +606,22 @@ public class HiveMetaStoreClientHelper { return "double"; } else if (dorisType.equals(Type.STRING)) { return "string"; + } else if (dorisType.equals(Type.DEFAULT_DECIMALV3)) { + StringBuilder decimalType = new StringBuilder(); + decimalType.append("decimal"); + ScalarType scalarType = (ScalarType) dorisType; + int precision = scalarType.getScalarPrecision(); + if (precision == 0) { + precision = ScalarType.DEFAULT_PRECISION; + } + // decimal(precision, scale) + int scale = scalarType.getScalarScale(); + decimalType.append("("); + decimalType.append(precision); + decimalType.append(","); + decimalType.append(scale); + decimalType.append(")"); + return decimalType.toString(); } throw new HMSClientException("Unsupported type conversion of " + dorisType.toSql()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 9279c48fbaa..a182aa9cc00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -19,8 +19,10 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; import org.apache.doris.common.Config; @@ -34,7 +36,9 @@ import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.THivePartitionUpdate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; @@ -42,25 +46,32 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; public class HiveMetadataOps implements ExternalMetadataOps { + public static final String LOCATION_URI_KEY = "location_uri"; + public static final String FILE_FORMAT_KEY = "file_format"; + public static final Set<String> DORIS_HIVE_KEYS = ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY); private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class); private static final int MIN_CLIENT_POOL_SIZE = 8; - private JdbcClientConfig jdbcClientConfig; - private HiveConf hiveConf; - private HMSExternalCatalog catalog; - private HMSCachedClient client; + private final HMSCachedClient client; private final RemoteFileSystem fs; + private final HMSExternalCatalog catalog; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { + this(catalog, createCachedClient(hiveConf, + Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), + jdbcClientConfig)); + } + + @VisibleForTesting + public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { this.catalog = catalog; - this.hiveConf = hiveConf; - this.jdbcClientConfig = jdbcClientConfig; - this.client = createCachedClient(hiveConf, - Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig); + this.client = client; this.fs = new DFSFileSystem(catalog.getProperties()); } @@ -91,10 +102,11 @@ public class HiveMetadataOps implements ExternalMetadataOps { try { HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata(); catalogDatabase.setDbName(fullDbName); - catalogDatabase.setProperties(properties); - if (properties.containsKey("location_uri")) { - catalogDatabase.setLocationUri(properties.get("location_uri")); + if (properties.containsKey(LOCATION_URI_KEY)) { + catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY)); } + properties.remove(LOCATION_URI_KEY); + catalogDatabase.setProperties(properties); catalogDatabase.setComment(properties.getOrDefault("comment", "")); client.createDatabase(catalogDatabase); catalog.onRefresh(true); @@ -124,16 +136,50 @@ public class HiveMetadataOps implements ExternalMetadataOps { throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName()); } try { - Map<String, String> props = stmt.getExtProperties(); - String fileFormat = props.getOrDefault("file_format", Config.hive_default_file_format); - HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName, - tblName, - stmt.getColumns(), - parsePartitionKeys(props), - props, - fileFormat); - - client.createTable(catalogTable, stmt.isSetIfNotExists()); + Map<String, String> props = stmt.getProperties(); + String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format); + Map<String, String> ddlProps = new HashMap<>(); + for (Map.Entry<String, String> entry : props.entrySet()) { + String key = entry.getKey().toLowerCase(); + if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) { + ddlProps.put("doris." + key, entry.getValue()); + } else { + ddlProps.put(key, entry.getValue()); + } + } + List<String> partitionColNames = new ArrayList<>(); + if (stmt.getPartitionDesc() != null) { + partitionColNames.addAll(stmt.getPartitionDesc().getPartitionColNames()); + } + HiveTableMetadata hiveTableMeta; + DistributionDesc bucketInfo = stmt.getDistributionDesc(); + if (bucketInfo != null) { + if (Config.enable_create_hive_bucket_table) { + if (bucketInfo instanceof HashDistributionDesc) { + hiveTableMeta = HiveTableMetadata.of(dbName, + tblName, + stmt.getColumns(), + partitionColNames, + ((HashDistributionDesc) bucketInfo).getDistributionColumnNames(), + bucketInfo.getBuckets(), + ddlProps, + fileFormat); + } else { + throw new UserException("External hive table only supports hash bucketing"); + } + } else { + throw new UserException("Create hive bucket table need" + + " set enable_create_hive_bucket_table to true"); + } + } else { + hiveTableMeta = HiveTableMetadata.of(dbName, + tblName, + stmt.getColumns(), + partitionColNames, + ddlProps, + fileFormat); + } + client.createTable(hiveTableMeta, stmt.isSetIfNotExists()); db.setUnInitialized(true); } catch (Exception e) { throw new UserException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java index fde0a2d4d04..d8de9d60734 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java @@ -20,32 +20,45 @@ package org.apache.doris.datasource.hive; import org.apache.doris.catalog.Column; import org.apache.doris.datasource.TableMetadata; -import org.apache.hadoop.hive.metastore.api.FieldSchema; - +import java.util.ArrayList; import java.util.List; import java.util.Map; public class HiveTableMetadata implements TableMetadata { - private String dbName; - private String tableName; - private List<Column> columns; - private List<FieldSchema> partitionKeys; - private String fileFormat; - private Map<String, String> properties; + private final String dbName; + private final String tableName; + private final List<Column> columns; + private final List<String> partitionKeys; + private final String fileFormat; + private final Map<String, String> properties; + private List<String> bucketCols; + private int numBuckets; // private String viewSql; public HiveTableMetadata(String dbName, String tblName, List<Column> columns, - List<FieldSchema> partitionKeys, + List<String> partitionKeys, + Map<String, String> props, + String fileFormat) { + this(dbName, tblName, columns, partitionKeys, new ArrayList<>(), 0, props, fileFormat); + } + + public HiveTableMetadata(String dbName, String tableName, + List<Column> columns, + List<String> partitionKeys, + List<String> bucketCols, + int numBuckets, Map<String, String> props, String fileFormat) { this.dbName = dbName; - this.tableName = tblName; + this.tableName = tableName; this.columns = columns; this.partitionKeys = partitionKeys; - this.fileFormat = fileFormat; + this.bucketCols = bucketCols; + this.numBuckets = numBuckets; this.properties = props; + this.fileFormat = fileFormat; } @Override @@ -67,7 +80,7 @@ public class HiveTableMetadata implements TableMetadata { return columns; } - public List<FieldSchema> getPartitionKeys() { + public List<String> getPartitionKeys() { return partitionKeys; } @@ -75,12 +88,32 @@ public class HiveTableMetadata implements TableMetadata { return fileFormat; } + public List<String> getBucketCols() { + return bucketCols; + } + + public int getNumBuckets() { + return numBuckets; + } + public static HiveTableMetadata of(String dbName, String tblName, List<Column> columns, - List<FieldSchema> partitionKeys, + List<String> partitionKeys, Map<String, String> props, String fileFormat) { return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, fileFormat); } + + public static HiveTableMetadata of(String dbName, + String tblName, + List<Column> columns, + List<String> partitionKeys, + List<String> bucketCols, + int numBuckets, + Map<String, String> props, + String fileFormat) { + return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, + bucketCols, numBuckets, props, fileFormat); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index d4f63c5a8fb..b5b1147447e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -71,12 +72,14 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -199,11 +202,14 @@ public class ThriftHMSCachedClient implements HMSCachedClient { table.setCreateTime(createTime); table.setLastAccessTime(createTime); // table.setRetention(0); - String location = hiveTable.getProperties().get("external_location"); - table.setSd(toHiveStorageDesc(hiveTable.getColumns(), - hiveTable.getFileFormat(), - location)); - table.setPartitionKeys(hiveTable.getPartitionKeys()); + String location = hiveTable.getProperties().get(HiveMetadataOps.LOCATION_URI_KEY); + Set<String> partitionSet = new HashSet<>(hiveTable.getPartitionKeys()); + Pair<List<FieldSchema>, List<FieldSchema>> hiveSchema = toHiveSchema(hiveTable.getColumns(), partitionSet); + + table.setSd(toHiveStorageDesc(hiveSchema.first, hiveTable.getBucketCols(), hiveTable.getNumBuckets(), + hiveTable.getFileFormat(), location)); + table.setPartitionKeys(hiveSchema.second); + // table.setViewOriginalText(hiveTable.getViewSql()); // table.setViewExpandedText(hiveTable.getViewSql()); table.setTableType("MANAGED_TABLE"); @@ -211,13 +217,19 @@ public class ThriftHMSCachedClient implements HMSCachedClient { return table; } - private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String fileFormat, String location) { + private static StorageDescriptor toHiveStorageDesc(List<FieldSchema> columns, + List<String> bucketCols, + int numBuckets, + String fileFormat, + String location) { StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(toHiveColumns(columns)); + sd.setCols(columns); setFileFormat(fileFormat, sd); if (StringUtils.isNotEmpty(location)) { sd.setLocation(location); } + sd.setBucketCols(bucketCols); + sd.setNumBuckets(numBuckets); Map<String, String> parameters = new HashMap<>(); parameters.put("tag", "doris external hive talbe"); sd.setParameters(parameters); @@ -246,17 +258,23 @@ public class ThriftHMSCachedClient implements HMSCachedClient { sd.setOutputFormat(outputFormat); } - private static List<FieldSchema> toHiveColumns(List<Column> columns) { - List<FieldSchema> result = new ArrayList<>(); + private static Pair<List<FieldSchema>, List<FieldSchema>> toHiveSchema(List<Column> columns, + Set<String> partitionSet) { + List<FieldSchema> hiveCols = new ArrayList<>(); + List<FieldSchema> hiveParts = new ArrayList<>(); for (Column column : columns) { FieldSchema hiveFieldSchema = new FieldSchema(); // TODO: add doc, just support doris type hiveFieldSchema.setType(HiveMetaStoreClientHelper.dorisTypeToHiveType(column.getType())); hiveFieldSchema.setName(column.getName()); hiveFieldSchema.setComment(column.getComment()); - result.add(hiveFieldSchema); + if (partitionSet.contains(column.getName())) { + hiveParts.add(hiveFieldSchema); + } else { + hiveCols.add(hiveFieldSchema); + } } - return result; + return Pair.of(hiveCols, hiveParts); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index 54f9895ab2c..c4d3068e09f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.ReplacePartitionClause; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.PropertyAnalyzer; @@ -42,16 +43,18 @@ public class InsertOverwriteUtil { /** * add temp partitions * - * @param olapTable + * @param tableIf * @param partitionNames * @param tempPartitionNames * @throws DdlException */ - public static void addTempPartitions(OlapTable olapTable, List<String> partitionNames, - List<String> tempPartitionNames) throws DdlException { - for (int i = 0; i < partitionNames.size(); i++) { - Env.getCurrentEnv().addPartitionLike((Database) olapTable.getDatabase(), olapTable.getName(), - new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true)); + public static void addTempPartitions(TableIf tableIf, List<String> partitionNames, + List<String> tempPartitionNames) throws DdlException { + if (tableIf instanceof OlapTable) { + for (int i = 0; i < partitionNames.size(); i++) { + Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(), + new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true)); + } } } @@ -63,23 +66,25 @@ public class InsertOverwriteUtil { * @param tempPartitionNames * @throws DdlException */ - public static void replacePartition(OlapTable olapTable, List<String> partitionNames, + public static void replacePartition(TableIf olapTable, List<String> partitionNames, List<String> tempPartitionNames) throws DdlException { - try { - if (!olapTable.writeLockIfExist()) { - return; + if (olapTable instanceof OlapTable) { + try { + if (!olapTable.writeLockIfExist()) { + return; + } + Map<String, String> properties = Maps.newHashMap(); + properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false"); + ReplacePartitionClause replacePartitionClause = new ReplacePartitionClause( + new PartitionNames(false, partitionNames), + new PartitionNames(true, tempPartitionNames), properties); + Env.getCurrentEnv() + .replaceTempPartition((Database) olapTable.getDatabase(), + (OlapTable) olapTable, replacePartitionClause); + } finally { + olapTable.writeUnlock(); } - Map<String, String> properties = Maps.newHashMap(); - properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false"); - ReplacePartitionClause replacePartitionClause = new ReplacePartitionClause( - new PartitionNames(false, partitionNames), - new PartitionNames(true, tempPartitionNames), properties); - Env.getCurrentEnv() - .replaceTempPartition((Database) olapTable.getDatabase(), olapTable, replacePartitionClause); - } finally { - olapTable.writeUnlock(); } - } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java index 9e4c2bc92a3..31e56fd6ccc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -21,7 +21,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; * For Hive Table */ public class HiveInsertCommandContext extends InsertCommandContext { - private boolean overwrite = true; + private boolean overwrite = false; public boolean isOverwrite() { return overwrite; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 0741982c968..44c17545be5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -42,6 +42,7 @@ import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; @@ -54,6 +55,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -88,7 +90,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS } public boolean isAutoDetectOverwrite() { - return ((UnboundTableSink<?>) this.logicalQuery).isAutoDetectPartition(); + return (logicalQuery instanceof UnboundTableSink) + && ((UnboundTableSink<?>) this.logicalQuery).isAutoDetectPartition(); } @Override @@ -118,27 +121,31 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS } Optional<TreeNode<?>> plan = (planner.getPhysicalPlan() - .<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny(); + .<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalTableSink)).stream().findAny(); Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode"); - PhysicalOlapTableSink<?> physicalOlapTableSink = ((PhysicalOlapTableSink<?>) plan.get()); - OlapTable targetTable = physicalOlapTableSink.getTargetTable(); - InternalDatabaseUtil - .checkDatabase(targetTable.getQualifiedDbName(), ConnectContext.get()); - // check auth - if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - targetTable.getQualifiedDbName() + ": " + targetTable.getName()); - } - - ConnectContext.get().setSkipAuth(true); - List<String> partitionNames = ((UnboundTableSink<?>) logicalQuery).getPartitions(); - if (CollectionUtils.isEmpty(partitionNames)) { - partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); + PhysicalTableSink<?> physicalTableSink = ((PhysicalTableSink<?>) plan.get()); + TableIf targetTable = physicalTableSink.getTargetTable(); + List<String> partitionNames; + if (physicalTableSink instanceof PhysicalOlapTableSink) { + InternalDatabaseUtil + .checkDatabase(((OlapTable) targetTable).getQualifiedDbName(), ConnectContext.get()); + // check auth + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), ((OlapTable) targetTable).getQualifiedDbName(), + targetTable.getName(), PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + ((OlapTable) targetTable).getQualifiedDbName() + ": " + targetTable.getName()); + } + ConnectContext.get().setSkipAuth(true); + partitionNames = ((UnboundTableSink<?>) logicalQuery).getPartitions(); + if (CollectionUtils.isEmpty(partitionNames)) { + partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); + } + } else { + // Do not create temp partition on FE + partitionNames = new ArrayList<>(); } - long taskId = 0; try { if (isAutoDetectOverwrite()) { @@ -170,6 +177,18 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS } } + private void runInsertCommand(LogicalPlan logicalQuery, InsertCommandContext insertCtx, + ConnectContext ctx, StmtExecutor executor) throws Exception { + InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(logicalQuery, labelName, + Optional.of(insertCtx)); + insertCommand.run(ctx, executor); + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); + LOG.warn("InsertInto state error:{}", errMsg); + throw new UserException(errMsg); + } + } + /** * insert into select. for sepecified temp partitions * @@ -208,18 +227,11 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); insertCtx = new HiveInsertCommandContext(); - ((HiveInsertCommandContext) insertCtx).setOverwrite(false); + ((HiveInsertCommandContext) insertCtx).setOverwrite(true); } else { - throw new RuntimeException("Current catalog does not support insert overwrite yet."); - } - InsertIntoTableCommand insertCommand = - new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); - insertCommand.run(ctx, executor); - if (ctx.getState().getStateType() == MysqlStateType.ERR) { - String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); - LOG.warn("InsertInto state error:{}", errMsg); - throw new UserException(errMsg); + throw new UserException("Current catalog does not support insert overwrite yet."); } + runInsertCommand(copySink, insertCtx, ctx, executor); } /** @@ -229,17 +241,19 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS * @param executor executor */ private void insertInto(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception { - UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery; // 1. for overwrite situation, we disable auto create partition. - // 2. we save and pass overwrite auto detect by insertCtx - OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(false, sink.isAutoDetectPartition(), groupId); - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(sink, labelName, Optional.of(insertCtx)); - insertCommand.run(ctx, executor); - if (ctx.getState().getStateType() == MysqlStateType.ERR) { - String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); - LOG.warn("InsertInto state error:{}", errMsg); - throw new UserException(errMsg); + // 2. we save and pass overwrite auto-detected by insertCtx + InsertCommandContext insertCtx; + if (logicalQuery instanceof UnboundTableSink) { + insertCtx = new OlapInsertCommandContext(false, + ((UnboundTableSink<?>) logicalQuery).isAutoDetectPartition(), groupId); + } else if (logicalQuery instanceof UnboundHiveTableSink) { + insertCtx = new HiveInsertCommandContext(); + ((HiveInsertCommandContext) insertCtx).setOverwrite(true); + } else { + throw new UserException("Current catalog does not support insert overwrite yet."); } + runInsertCommand(logicalQuery, insertCtx, ctx, executor); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java index 7feb53e24b0..1461c0626d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -44,4 +45,6 @@ public abstract class PhysicalTableSink<CHILD_TYPE extends Plan> extends Physica } public abstract PhysicalProperties getRequirePhysicalProperties(); + + public abstract TableIf getTargetTable(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java new file mode 100644 index 00000000000..af57aae703b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DistributionDesc; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.HashDistributionDesc; +import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.DatabaseMetadata; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.TableMetadata; + +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HiveMetadataOpsTest { + + private HiveMetadataOps metadataOps; + + @Mocked + private HMSCachedClient mockedClient; + @Mocked + private HMSExternalCatalog mockedCatalog; + + @BeforeEach + public void init() { + metadataOps = new HiveMetadataOps(mockedCatalog, mockedClient); + new MockUp<HMSExternalCatalog>(HMSExternalCatalog.class) { + @Mock + public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) { + return new HMSExternalDatabase(mockedCatalog, 0L, "mockedDb"); + } + + @Mock + public void onRefresh(boolean invalidCache) { + // mocked + } + }; + new MockUp<HMSCachedClient>(HMSCachedClient.class) { + @Mock + public void createDatabase(DatabaseMetadata catalogDatabase) { + // mocked + } + + @Mock + public void dropDatabase(String dbName) { + // mocked + } + + @Mock + public void dropTable(String dbName, String tableName) { + // mocked + } + + @Mock + public void createTable(TableMetadata catalogTable, boolean ignoreIfExists) { + // mocked + } + }; + } + + private void createDb(String dbName, Map<String, String> props) throws DdlException { + CreateDbStmt createDbStmt = new CreateDbStmt(true, dbName, props); + metadataOps.createDb(createDbStmt); + } + + private void dropDb(String dbName, boolean forceDrop) throws DdlException { + DropDbStmt dropDbStmt = new DropDbStmt(true, dbName, forceDrop); + metadataOps.dropDb(dropDbStmt); + } + + private void createTable(TableName tableName, + List<Column> cols, + List<String> parts, + List<String> buckets, + Map<String, String> props) + throws UserException { + PartitionDesc partitionDesc = new PartitionDesc(parts, null); + DistributionDesc distributionDesc = null; + if (!buckets.isEmpty()) { + distributionDesc = new HashDistributionDesc(10, buckets); + } + List<String> colsName = cols.stream().map(Column::getName).collect(Collectors.toList()); + CreateTableStmt stmt = new CreateTableStmt(true, false, + tableName, + cols, null, + "hive", + new KeysDesc(KeysType.AGG_KEYS, colsName), + partitionDesc, + distributionDesc, + props, + props, + "comment", + null, null); + metadataOps.createTable(stmt); + } + + private void dropTable(TableName tableName, boolean forceDrop) throws DdlException { + DropTableStmt dropTblStmt = new DropTableStmt(true, tableName, forceDrop); + metadataOps.dropTable(dropTblStmt); + } + + @Test + public void testCreateAndDropAll() throws UserException { + Map<String, String> dbProps = new HashMap<>(); + dbProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/db"); + createDb("mockedDb", dbProps); + Map<String, String> tblProps = new HashMap<>(); + tblProps.put(HiveMetadataOps.FILE_FORMAT_KEY, "orc"); + tblProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/tbl"); + tblProps.put("fs.defaultFS", "hdfs://ha"); + TableName tableName = new TableName("mockedCtl", "mockedDb", "mockedTbl"); + List<Column> cols = new ArrayList<>(); + cols.add(new Column("id", Type.BIGINT)); + cols.add(new Column("pt", Type.STRING)); + cols.add(new Column("rate", Type.DOUBLE)); + cols.add(new Column("time", Type.DATETIME)); + List<String> parts = new ArrayList<>(); + parts.add("pt"); + List<String> bucks = new ArrayList<>(); + // bucks.add("id"); + createTable(tableName, cols, parts, bucks, tblProps); + dropTable(tableName, true); + dropDb("mockedDb", true); + // TODO: use TestWithFeService to double check plan + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 2316e65bf60..e5392fb11a8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -24,7 +24,6 @@ import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; import com.google.common.collect.Lists; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.After; @@ -97,8 +96,8 @@ public class HmsCommitTest { List<Column> columns = new ArrayList<>(); columns.add(new Column("c1", PrimitiveType.INT, true)); columns.add(new Column("c2", PrimitiveType.STRING, true)); - List<FieldSchema> partitionKeys = new ArrayList<>(); - partitionKeys.add(new FieldSchema("c3", "string", "comment")); + List<String> partitionKeys = new ArrayList<>(); + partitionKeys.add("c3"); HiveTableMetadata tableMetadata = new HiveTableMetadata( dbName, tbWithPartition, columns, partitionKeys, new HashMap<>(), fileFormat); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org