This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e55bc7496 [feature][insert]Adapt the create table  statement to the 
nereids sql (#32458)
77e55bc7496 is described below

commit 77e55bc74966acf7b4ba8e62c042124ca5a2cbb4
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Wed Mar 20 20:47:18 2024 +0800

    [feature][insert]Adapt the create table  statement to the nereids sql 
(#32458)
    
    issue: #31442
    
    1. adapt  create table statement from doris  to hive
    2. fix insert overwrite for table sink
    
    > The doris create hive table statement:
    
    ```
    mysql> CREATE TABLE buck2(
        ->     id int COMMENT 'col1',
        ->     name string COMMENT 'col2',
        ->     dt string COMMENT 'part1',
        ->     dtm string COMMENT 'part2'
        -> ) ENGINE=hive
        -> COMMENT "create tbl"
        -> PARTITION BY LIST (dt, dtm) ()
        -> DISTRIBUTED BY HASH (id) BUCKETS 16
        -> PROPERTIES(
        ->     "file_format" = "orc"
        -> );
    ```
    
    > generated  hive create table statement:
    
    ```
    CREATE TABLE `buck2`(
      `id` int COMMENT 'col1',
      `name` string COMMENT 'col2')
    PARTITIONED BY (
     `dt` string,
     `dtm` string)
    CLUSTERED BY (
      id)
    INTO 16 BUCKETS
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
    STORED AS INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
    LOCATION
      'hdfs://HDFS8000871/usr/hive/warehouse/jz3.db/buck2'
    TBLPROPERTIES (
      'transient_lastDdlTime'='1710840747',
      'doris.file_format'='orc')
    
    ```
---
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../datasource/hive/HiveMetaStoreClientHelper.java |  16 ++
 .../doris/datasource/hive/HiveMetadataOps.java     |  88 ++++++++---
 .../doris/datasource/hive/HiveTableMetadata.java   |  59 ++++++--
 .../datasource/hive/ThriftHMSCachedClient.java     |  40 +++--
 .../doris/insertoverwrite/InsertOverwriteUtil.java |  45 +++---
 .../commands/insert/HiveInsertCommandContext.java  |   2 +-
 .../insert/InsertOverwriteTableCommand.java        |  92 +++++++-----
 .../trees/plans/physical/PhysicalTableSink.java    |   3 +
 .../doris/datasource/hive/HiveMetadataOpsTest.java | 162 +++++++++++++++++++++
 .../doris/datasource/hive/HmsCommitTest.java       |   5 +-
 11 files changed, 409 insertions(+), 108 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index daa9c8b1d35..9783c30ad91 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2216,6 +2216,11 @@ public class Config extends ConfigBase {
             "Enable external table DDL"})
     public static boolean enable_external_ddl = false;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "启用Hive分桶表",
+            "Enable external hive bucket table"})
+    public static boolean enable_create_hive_bucket_table = false;
+
     @ConfField(mutable = true, masterOnly = true, description = {
             "Hive创建外部表默认指定的文件格式",
             "Default hive file format for creating table."})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 23c83a11146..bbb6129e2c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -606,6 +606,22 @@ public class HiveMetaStoreClientHelper {
             return "double";
         } else if (dorisType.equals(Type.STRING)) {
             return "string";
+        } else if (dorisType.equals(Type.DEFAULT_DECIMALV3)) {
+            StringBuilder decimalType = new StringBuilder();
+            decimalType.append("decimal");
+            ScalarType scalarType = (ScalarType) dorisType;
+            int precision = scalarType.getScalarPrecision();
+            if (precision == 0) {
+                precision = ScalarType.DEFAULT_PRECISION;
+            }
+            // decimal(precision, scale)
+            int scale = scalarType.getScalarScale();
+            decimalType.append("(");
+            decimalType.append(precision);
+            decimalType.append(",");
+            decimalType.append(scale);
+            decimalType.append(")");
+            return decimalType.toString();
         }
         throw new HMSClientException("Unsupported type conversion of " + 
dorisType.toSql());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 9279c48fbaa..a182aa9cc00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -19,8 +19,10 @@ package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropDbStmt;
 import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.common.Config;
@@ -34,7 +36,9 @@ import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.thrift.THivePartitionUpdate;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -42,25 +46,32 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 
 public class HiveMetadataOps implements ExternalMetadataOps {
+    public static final String LOCATION_URI_KEY = "location_uri";
+    public static final String FILE_FORMAT_KEY = "file_format";
+    public static final Set<String> DORIS_HIVE_KEYS = 
ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY);
     private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
     private static final int MIN_CLIENT_POOL_SIZE = 8;
-    private JdbcClientConfig jdbcClientConfig;
-    private HiveConf hiveConf;
-    private HMSExternalCatalog catalog;
-    private HMSCachedClient client;
+    private final HMSCachedClient client;
     private final RemoteFileSystem fs;
+    private final HMSExternalCatalog catalog;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
+        this(catalog, createCachedClient(hiveConf,
+                Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size),
+                jdbcClientConfig));
+    }
+
+    @VisibleForTesting
+    public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) 
{
         this.catalog = catalog;
-        this.hiveConf = hiveConf;
-        this.jdbcClientConfig = jdbcClientConfig;
-        this.client = createCachedClient(hiveConf,
-                Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig);
+        this.client = client;
         this.fs = new DFSFileSystem(catalog.getProperties());
     }
 
@@ -91,10 +102,11 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         try {
             HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata();
             catalogDatabase.setDbName(fullDbName);
-            catalogDatabase.setProperties(properties);
-            if (properties.containsKey("location_uri")) {
-                catalogDatabase.setLocationUri(properties.get("location_uri"));
+            if (properties.containsKey(LOCATION_URI_KEY)) {
+                
catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY));
             }
+            properties.remove(LOCATION_URI_KEY);
+            catalogDatabase.setProperties(properties);
             catalogDatabase.setComment(properties.getOrDefault("comment", ""));
             client.createDatabase(catalogDatabase);
             catalog.onRefresh(true);
@@ -124,16 +136,50 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
             throw new UserException("Failed to get database: '" + dbName + "' 
in catalog: " + catalog.getName());
         }
         try {
-            Map<String, String> props = stmt.getExtProperties();
-            String fileFormat = props.getOrDefault("file_format", 
Config.hive_default_file_format);
-            HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName,
-                    tblName,
-                    stmt.getColumns(),
-                    parsePartitionKeys(props),
-                    props,
-                    fileFormat);
-
-            client.createTable(catalogTable, stmt.isSetIfNotExists());
+            Map<String, String> props = stmt.getProperties();
+            String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, 
Config.hive_default_file_format);
+            Map<String, String> ddlProps = new HashMap<>();
+            for (Map.Entry<String, String> entry : props.entrySet()) {
+                String key = entry.getKey().toLowerCase();
+                if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) {
+                    ddlProps.put("doris." + key, entry.getValue());
+                } else {
+                    ddlProps.put(key, entry.getValue());
+                }
+            }
+            List<String> partitionColNames = new ArrayList<>();
+            if (stmt.getPartitionDesc() != null) {
+                
partitionColNames.addAll(stmt.getPartitionDesc().getPartitionColNames());
+            }
+            HiveTableMetadata hiveTableMeta;
+            DistributionDesc bucketInfo = stmt.getDistributionDesc();
+            if (bucketInfo != null) {
+                if (Config.enable_create_hive_bucket_table) {
+                    if (bucketInfo instanceof HashDistributionDesc) {
+                        hiveTableMeta = HiveTableMetadata.of(dbName,
+                                tblName,
+                                stmt.getColumns(),
+                                partitionColNames,
+                                ((HashDistributionDesc) 
bucketInfo).getDistributionColumnNames(),
+                                bucketInfo.getBuckets(),
+                                ddlProps,
+                                fileFormat);
+                    } else {
+                        throw new UserException("External hive table only 
supports hash bucketing");
+                    }
+                } else {
+                    throw new UserException("Create hive bucket table need"
+                            + " set enable_create_hive_bucket_table to true");
+                }
+            } else {
+                hiveTableMeta = HiveTableMetadata.of(dbName,
+                        tblName,
+                        stmt.getColumns(),
+                        partitionColNames,
+                        ddlProps,
+                        fileFormat);
+            }
+            client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
             db.setUnInitialized(true);
         } catch (Exception e) {
             throw new UserException(e.getMessage(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
index fde0a2d4d04..d8de9d60734 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
@@ -20,32 +20,45 @@ package org.apache.doris.datasource.hive;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.datasource.TableMetadata;
 
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 public class HiveTableMetadata implements TableMetadata {
-    private String dbName;
-    private String tableName;
-    private List<Column> columns;
-    private List<FieldSchema> partitionKeys;
-    private String fileFormat;
-    private Map<String, String> properties;
+    private final String dbName;
+    private final String tableName;
+    private final List<Column> columns;
+    private final List<String> partitionKeys;
+    private final String fileFormat;
+    private final Map<String, String> properties;
+    private List<String> bucketCols;
+    private int numBuckets;
     // private String viewSql;
 
     public HiveTableMetadata(String dbName,
                              String tblName,
                              List<Column> columns,
-                             List<FieldSchema> partitionKeys,
+                             List<String> partitionKeys,
+                             Map<String, String> props,
+                             String fileFormat) {
+        this(dbName, tblName, columns, partitionKeys, new ArrayList<>(), 0, 
props, fileFormat);
+    }
+
+    public HiveTableMetadata(String dbName, String tableName,
+                             List<Column> columns,
+                             List<String> partitionKeys,
+                             List<String> bucketCols,
+                             int numBuckets,
                              Map<String, String> props,
                              String fileFormat) {
         this.dbName = dbName;
-        this.tableName = tblName;
+        this.tableName = tableName;
         this.columns = columns;
         this.partitionKeys = partitionKeys;
-        this.fileFormat = fileFormat;
+        this.bucketCols = bucketCols;
+        this.numBuckets = numBuckets;
         this.properties = props;
+        this.fileFormat = fileFormat;
     }
 
     @Override
@@ -67,7 +80,7 @@ public class HiveTableMetadata implements TableMetadata {
         return columns;
     }
 
-    public List<FieldSchema> getPartitionKeys() {
+    public List<String> getPartitionKeys() {
         return partitionKeys;
     }
 
@@ -75,12 +88,32 @@ public class HiveTableMetadata implements TableMetadata {
         return fileFormat;
     }
 
+    public List<String> getBucketCols() {
+        return bucketCols;
+    }
+
+    public int getNumBuckets() {
+        return numBuckets;
+    }
+
     public static HiveTableMetadata of(String dbName,
                                        String tblName,
                                        List<Column> columns,
-                                       List<FieldSchema> partitionKeys,
+                                       List<String> partitionKeys,
                                        Map<String, String> props,
                                        String fileFormat) {
         return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, 
props, fileFormat);
     }
+
+    public static HiveTableMetadata of(String dbName,
+                                       String tblName,
+                                       List<Column> columns,
+                                       List<String> partitionKeys,
+                                       List<String> bucketCols,
+                                       int numBuckets,
+                                       Map<String, String> props,
+                                       String fileFormat) {
+        return new HiveTableMetadata(dbName, tblName, columns, partitionKeys,
+                bucketCols, numBuckets, props, fileFormat);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index d4f63c5a8fb..b5b1147447e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -71,12 +72,14 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -199,11 +202,14 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         table.setCreateTime(createTime);
         table.setLastAccessTime(createTime);
         // table.setRetention(0);
-        String location = hiveTable.getProperties().get("external_location");
-        table.setSd(toHiveStorageDesc(hiveTable.getColumns(),
-                hiveTable.getFileFormat(),
-                location));
-        table.setPartitionKeys(hiveTable.getPartitionKeys());
+        String location = 
hiveTable.getProperties().get(HiveMetadataOps.LOCATION_URI_KEY);
+        Set<String> partitionSet = new HashSet<>(hiveTable.getPartitionKeys());
+        Pair<List<FieldSchema>, List<FieldSchema>> hiveSchema = 
toHiveSchema(hiveTable.getColumns(), partitionSet);
+
+        table.setSd(toHiveStorageDesc(hiveSchema.first, 
hiveTable.getBucketCols(), hiveTable.getNumBuckets(),
+                hiveTable.getFileFormat(), location));
+        table.setPartitionKeys(hiveSchema.second);
+
         // table.setViewOriginalText(hiveTable.getViewSql());
         // table.setViewExpandedText(hiveTable.getViewSql());
         table.setTableType("MANAGED_TABLE");
@@ -211,13 +217,19 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         return table;
     }
 
-    private static StorageDescriptor toHiveStorageDesc(List<Column> columns, 
String fileFormat, String location) {
+    private static StorageDescriptor toHiveStorageDesc(List<FieldSchema> 
columns,
+                                                       List<String> bucketCols,
+                                                       int numBuckets,
+                                                       String fileFormat,
+                                                       String location) {
         StorageDescriptor sd = new StorageDescriptor();
-        sd.setCols(toHiveColumns(columns));
+        sd.setCols(columns);
         setFileFormat(fileFormat, sd);
         if (StringUtils.isNotEmpty(location)) {
             sd.setLocation(location);
         }
+        sd.setBucketCols(bucketCols);
+        sd.setNumBuckets(numBuckets);
         Map<String, String> parameters = new HashMap<>();
         parameters.put("tag", "doris external hive talbe");
         sd.setParameters(parameters);
@@ -246,17 +258,23 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         sd.setOutputFormat(outputFormat);
     }
 
-    private static List<FieldSchema> toHiveColumns(List<Column> columns) {
-        List<FieldSchema> result = new ArrayList<>();
+    private static Pair<List<FieldSchema>, List<FieldSchema>> 
toHiveSchema(List<Column> columns,
+                Set<String> partitionSet) {
+        List<FieldSchema> hiveCols = new ArrayList<>();
+        List<FieldSchema> hiveParts = new ArrayList<>();
         for (Column column : columns) {
             FieldSchema hiveFieldSchema = new FieldSchema();
             // TODO: add doc, just support doris type
             
hiveFieldSchema.setType(HiveMetaStoreClientHelper.dorisTypeToHiveType(column.getType()));
             hiveFieldSchema.setName(column.getName());
             hiveFieldSchema.setComment(column.getComment());
-            result.add(hiveFieldSchema);
+            if (partitionSet.contains(column.getName())) {
+                hiveParts.add(hiveFieldSchema);
+            } else {
+                hiveCols.add(hiveFieldSchema);
+            }
         }
-        return result;
+        return Pair.of(hiveCols, hiveParts);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
index 54f9895ab2c..c4d3068e09f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ReplacePartitionClause;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.PropertyAnalyzer;
 
@@ -42,16 +43,18 @@ public class InsertOverwriteUtil {
     /**
      * add temp partitions
      *
-     * @param olapTable
+     * @param tableIf
      * @param partitionNames
      * @param tempPartitionNames
      * @throws DdlException
      */
-    public static void addTempPartitions(OlapTable olapTable, List<String> 
partitionNames,
-            List<String> tempPartitionNames) throws DdlException {
-        for (int i = 0; i < partitionNames.size(); i++) {
-            Env.getCurrentEnv().addPartitionLike((Database) 
olapTable.getDatabase(), olapTable.getName(),
-                    new AddPartitionLikeClause(tempPartitionNames.get(i), 
partitionNames.get(i), true));
+    public static void addTempPartitions(TableIf tableIf, List<String> 
partitionNames,
+                                         List<String> tempPartitionNames) 
throws DdlException {
+        if (tableIf instanceof OlapTable) {
+            for (int i = 0; i < partitionNames.size(); i++) {
+                Env.getCurrentEnv().addPartitionLike((Database) 
tableIf.getDatabase(), tableIf.getName(),
+                        new AddPartitionLikeClause(tempPartitionNames.get(i), 
partitionNames.get(i), true));
+            }
         }
     }
 
@@ -63,23 +66,25 @@ public class InsertOverwriteUtil {
      * @param tempPartitionNames
      * @throws DdlException
      */
-    public static void replacePartition(OlapTable olapTable, List<String> 
partitionNames,
+    public static void replacePartition(TableIf olapTable, List<String> 
partitionNames,
             List<String> tempPartitionNames) throws DdlException {
-        try {
-            if (!olapTable.writeLockIfExist()) {
-                return;
+        if (olapTable instanceof OlapTable) {
+            try {
+                if (!olapTable.writeLockIfExist()) {
+                    return;
+                }
+                Map<String, String> properties = Maps.newHashMap();
+                
properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false");
+                ReplacePartitionClause replacePartitionClause = new 
ReplacePartitionClause(
+                        new PartitionNames(false, partitionNames),
+                        new PartitionNames(true, tempPartitionNames), 
properties);
+                Env.getCurrentEnv()
+                        .replaceTempPartition((Database) 
olapTable.getDatabase(),
+                                (OlapTable) olapTable, replacePartitionClause);
+            } finally {
+                olapTable.writeUnlock();
             }
-            Map<String, String> properties = Maps.newHashMap();
-            
properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false");
-            ReplacePartitionClause replacePartitionClause = new 
ReplacePartitionClause(
-                    new PartitionNames(false, partitionNames),
-                    new PartitionNames(true, tempPartitionNames), properties);
-            Env.getCurrentEnv()
-                    .replaceTempPartition((Database) olapTable.getDatabase(), 
olapTable, replacePartitionClause);
-        } finally {
-            olapTable.writeUnlock();
         }
-
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
index 9e4c2bc92a3..31e56fd6ccc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
@@ -21,7 +21,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
  * For Hive Table
  */
 public class HiveInsertCommandContext extends InsertCommandContext {
-    private boolean overwrite = true;
+    private boolean overwrite = false;
 
     public boolean isOverwrite() {
         return overwrite;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 0741982c968..44c17545be5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -42,6 +42,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTableSink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -54,6 +55,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -88,7 +90,8 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
     }
 
     public boolean isAutoDetectOverwrite() {
-        return ((UnboundTableSink<?>) 
this.logicalQuery).isAutoDetectPartition();
+        return (logicalQuery instanceof UnboundTableSink)
+                && ((UnboundTableSink<?>) 
this.logicalQuery).isAutoDetectPartition();
     }
 
     @Override
@@ -118,27 +121,31 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
         }
 
         Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
-                .<Set<TreeNode<?>>>collect(node -> node instanceof 
PhysicalOlapTableSink)).stream().findAny();
+                .<Set<TreeNode<?>>>collect(node -> node instanceof 
PhysicalTableSink)).stream().findAny();
         Preconditions.checkArgument(plan.isPresent(), "insert into command 
must contain OlapTableSinkNode");
-        PhysicalOlapTableSink<?> physicalOlapTableSink = 
((PhysicalOlapTableSink<?>) plan.get());
-        OlapTable targetTable = physicalOlapTableSink.getTargetTable();
-        InternalDatabaseUtil
-                .checkDatabase(targetTable.getQualifiedDbName(), 
ConnectContext.get());
-        // check auth
-        if (!Env.getCurrentEnv().getAccessManager()
-                .checkTblPriv(ConnectContext.get(), 
targetTable.getQualifiedDbName(), targetTable.getName(),
-                        PrivPredicate.LOAD)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
-                    ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
-                    targetTable.getQualifiedDbName() + ": " + 
targetTable.getName());
-        }
-
-        ConnectContext.get().setSkipAuth(true);
-        List<String> partitionNames = ((UnboundTableSink<?>) 
logicalQuery).getPartitions();
-        if (CollectionUtils.isEmpty(partitionNames)) {
-            partitionNames = 
Lists.newArrayList(targetTable.getPartitionNames());
+        PhysicalTableSink<?> physicalTableSink = ((PhysicalTableSink<?>) 
plan.get());
+        TableIf targetTable = physicalTableSink.getTargetTable();
+        List<String> partitionNames;
+        if (physicalTableSink instanceof PhysicalOlapTableSink) {
+            InternalDatabaseUtil
+                    .checkDatabase(((OlapTable) 
targetTable).getQualifiedDbName(), ConnectContext.get());
+            // check auth
+            if (!Env.getCurrentEnv().getAccessManager()
+                    .checkTblPriv(ConnectContext.get(), ((OlapTable) 
targetTable).getQualifiedDbName(),
+                            targetTable.getName(), PrivPredicate.LOAD)) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
+                        ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
+                        ((OlapTable) targetTable).getQualifiedDbName() + ": " 
+ targetTable.getName());
+            }
+            ConnectContext.get().setSkipAuth(true);
+            partitionNames = ((UnboundTableSink<?>) 
logicalQuery).getPartitions();
+            if (CollectionUtils.isEmpty(partitionNames)) {
+                partitionNames = 
Lists.newArrayList(targetTable.getPartitionNames());
+            }
+        } else {
+            // Do not create temp partition on FE
+            partitionNames = new ArrayList<>();
         }
-
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
@@ -170,6 +177,18 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
         }
     }
 
+    private void runInsertCommand(LogicalPlan logicalQuery, 
InsertCommandContext insertCtx,
+                                  ConnectContext ctx, StmtExecutor executor) 
throws Exception {
+        InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(logicalQuery, labelName,
+                Optional.of(insertCtx));
+        insertCommand.run(ctx, executor);
+        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
+            String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
+            LOG.warn("InsertInto state error:{}", errMsg);
+            throw new UserException(errMsg);
+        }
+    }
+
     /**
      * insert into select. for sepecified temp partitions
      *
@@ -208,18 +227,11 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                     sink.getDMLCommandType(),
                     (LogicalPlan) (sink.child(0)));
             insertCtx = new HiveInsertCommandContext();
-            ((HiveInsertCommandContext) insertCtx).setOverwrite(false);
+            ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
         } else {
-            throw new RuntimeException("Current catalog does not support 
insert overwrite yet.");
-        }
-        InsertIntoTableCommand insertCommand =
-                new InsertIntoTableCommand(copySink, labelName, 
Optional.of(insertCtx));
-        insertCommand.run(ctx, executor);
-        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
-            String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
-            LOG.warn("InsertInto state error:{}", errMsg);
-            throw new UserException(errMsg);
+            throw new UserException("Current catalog does not support insert 
overwrite yet.");
         }
+        runInsertCommand(copySink, insertCtx, ctx, executor);
     }
 
     /**
@@ -229,17 +241,19 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
      * @param executor executor
      */
     private void insertInto(ConnectContext ctx, StmtExecutor executor, long 
groupId) throws Exception {
-        UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
         // 1. for overwrite situation, we disable auto create partition.
-        // 2. we save and pass overwrite auto detect by insertCtx
-        OlapInsertCommandContext insertCtx = new 
OlapInsertCommandContext(false, sink.isAutoDetectPartition(), groupId);
-        InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(sink, labelName, Optional.of(insertCtx));
-        insertCommand.run(ctx, executor);
-        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
-            String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
-            LOG.warn("InsertInto state error:{}", errMsg);
-            throw new UserException(errMsg);
+        // 2. we save and pass overwrite auto-detected by insertCtx
+        InsertCommandContext insertCtx;
+        if (logicalQuery instanceof UnboundTableSink) {
+            insertCtx = new OlapInsertCommandContext(false,
+                    ((UnboundTableSink<?>) 
logicalQuery).isAutoDetectPartition(), groupId);
+        } else if (logicalQuery instanceof UnboundHiveTableSink) {
+            insertCtx = new HiveInsertCommandContext();
+            ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
+        } else {
+            throw new UserException("Current catalog does not support insert 
overwrite yet.");
         }
+        runInsertCommand(logicalQuery, insertCtx, ctx, executor);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
index 7feb53e24b0..1461c0626d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.physical;
 
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
@@ -44,4 +45,6 @@ public abstract class PhysicalTableSink<CHILD_TYPE extends 
Plan> extends Physica
     }
 
     public abstract PhysicalProperties getRequirePhysicalProperties();
+
+    public abstract TableIf getTargetTable();
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
new file mode 100644
index 00000000000..af57aae703b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.DatabaseMetadata;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.TableMetadata;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveMetadataOpsTest {
+
+    private HiveMetadataOps metadataOps;
+
+    @Mocked
+    private HMSCachedClient mockedClient;
+    @Mocked
+    private HMSExternalCatalog mockedCatalog;
+
+    @BeforeEach
+    public void init() {
+        metadataOps = new HiveMetadataOps(mockedCatalog, mockedClient);
+        new MockUp<HMSExternalCatalog>(HMSExternalCatalog.class) {
+            @Mock
+            public ExternalDatabase<? extends ExternalTable> 
getDbNullable(String dbName) {
+                return new HMSExternalDatabase(mockedCatalog, 0L, "mockedDb");
+            }
+
+            @Mock
+            public void onRefresh(boolean invalidCache) {
+                // mocked
+            }
+        };
+        new MockUp<HMSCachedClient>(HMSCachedClient.class) {
+            @Mock
+            public void createDatabase(DatabaseMetadata catalogDatabase) {
+                // mocked
+            }
+
+            @Mock
+            public void dropDatabase(String dbName) {
+                // mocked
+            }
+
+            @Mock
+            public void dropTable(String dbName, String tableName) {
+                // mocked
+            }
+
+            @Mock
+            public void createTable(TableMetadata catalogTable, boolean 
ignoreIfExists) {
+                // mocked
+            }
+        };
+    }
+
+    private void createDb(String dbName, Map<String, String> props) throws 
DdlException {
+        CreateDbStmt createDbStmt = new CreateDbStmt(true, dbName, props);
+        metadataOps.createDb(createDbStmt);
+    }
+
+    private void dropDb(String dbName, boolean forceDrop) throws DdlException {
+        DropDbStmt dropDbStmt = new DropDbStmt(true, dbName, forceDrop);
+        metadataOps.dropDb(dropDbStmt);
+    }
+
+    private void createTable(TableName tableName,
+                             List<Column> cols,
+                             List<String> parts,
+                             List<String> buckets,
+                             Map<String, String> props)
+            throws UserException {
+        PartitionDesc partitionDesc = new PartitionDesc(parts, null);
+        DistributionDesc distributionDesc = null;
+        if (!buckets.isEmpty()) {
+            distributionDesc = new HashDistributionDesc(10, buckets);
+        }
+        List<String> colsName = 
cols.stream().map(Column::getName).collect(Collectors.toList());
+        CreateTableStmt stmt = new CreateTableStmt(true, false,
+                tableName,
+                cols, null,
+                "hive",
+                new KeysDesc(KeysType.AGG_KEYS, colsName),
+                partitionDesc,
+                distributionDesc,
+                props,
+                props,
+                "comment",
+                null, null);
+        metadataOps.createTable(stmt);
+    }
+
+    private void dropTable(TableName tableName, boolean forceDrop) throws 
DdlException {
+        DropTableStmt dropTblStmt = new DropTableStmt(true, tableName, 
forceDrop);
+        metadataOps.dropTable(dropTblStmt);
+    }
+
+    @Test
+    public void testCreateAndDropAll() throws UserException {
+        Map<String, String> dbProps = new HashMap<>();
+        dbProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/db");
+        createDb("mockedDb", dbProps);
+        Map<String, String> tblProps = new HashMap<>();
+        tblProps.put(HiveMetadataOps.FILE_FORMAT_KEY, "orc");
+        tblProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/tbl");
+        tblProps.put("fs.defaultFS", "hdfs://ha");
+        TableName tableName = new TableName("mockedCtl", "mockedDb", 
"mockedTbl");
+        List<Column> cols = new ArrayList<>();
+        cols.add(new Column("id", Type.BIGINT));
+        cols.add(new Column("pt", Type.STRING));
+        cols.add(new Column("rate", Type.DOUBLE));
+        cols.add(new Column("time", Type.DATETIME));
+        List<String> parts = new ArrayList<>();
+        parts.add("pt");
+        List<String> bucks = new ArrayList<>();
+        // bucks.add("id");
+        createTable(tableName, cols, parts, bucks, tblProps);
+        dropTable(tableName, true);
+        dropDb("mockedDb", true);
+        // TODO: use TestWithFeService to double check plan
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 2316e65bf60..e5392fb11a8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -24,7 +24,6 @@ import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUpdateMode;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.After;
@@ -97,8 +96,8 @@ public class HmsCommitTest {
         List<Column> columns = new ArrayList<>();
         columns.add(new Column("c1", PrimitiveType.INT, true));
         columns.add(new Column("c2", PrimitiveType.STRING, true));
-        List<FieldSchema> partitionKeys = new ArrayList<>();
-        partitionKeys.add(new FieldSchema("c3", "string", "comment"));
+        List<String> partitionKeys = new ArrayList<>();
+        partitionKeys.add("c3");
         HiveTableMetadata tableMetadata = new HiveTableMetadata(
                 dbName, tbWithPartition, columns, partitionKeys,
                 new HashMap<>(), fileFormat);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to