This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 754b55f5b8b [fix](catalog) close connection on refresh (#35426)
754b55f5b8b is described below

commit 754b55f5b8b029bc7fe52f7fc909cdfb6c1f4eda
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Thu May 30 18:52:51 2024 +0800

    [fix](catalog) close connection on refresh (#35426)
    
    1. When refresh the catalog, the related client(such as HMSClient)
    should be closed,
    or the connection may be leaked.
    
    2. Remove some unused code related to deprecated hive external table
---
 .../apache/doris/datasource/InternalCatalog.java   | 29 ---------
 .../doris/datasource/hive/HMSCachedClient.java     |  5 ++
 .../doris/datasource/hive/HMSExternalCatalog.java  |  8 +++
 .../datasource/hive/HiveMetaStoreClientHelper.java | 71 ----------------------
 .../doris/datasource/hive/HiveMetadataOps.java     |  9 ++-
 .../hive/PostgreSQLJdbcHMSCachedClient.java        |  5 ++
 .../datasource/hive/ThriftHMSCachedClient.java     | 18 +++++-
 .../datasource/iceberg/IcebergMetadataOps.java     |  4 ++
 .../datasource/operations/ExternalMetadataOps.java |  5 ++
 .../doris/datasource/TestHMSCachedClient.java      |  4 ++
 10 files changed, 55 insertions(+), 103 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c9fd68cfe12..5f589ad1c66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -72,7 +72,6 @@ import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.HashDistributionInfo;
-import org.apache.doris.catalog.HiveTable;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.InfoSchemaDb;
 import org.apache.doris.catalog.JdbcTable;
@@ -138,9 +137,6 @@ import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.es.EsRepository;
-import org.apache.doris.datasource.hive.HMSCachedClient;
-import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.event.DropPartitionEvent;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import 
org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
@@ -186,7 +182,6 @@ import lombok.Getter;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.StopWatch;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
@@ -3021,30 +3016,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return checkCreateTableResult(tableName, tableId, result);
     }
 
-    private void createHiveTable(Database db, CreateTableStmt stmt) throws 
DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-        long tableId = Env.getCurrentEnv().getNextId();
-        HiveTable hiveTable = new HiveTable(tableId, tableName, columns, 
stmt.getProperties());
-        hiveTable.setComment(stmt.getComment());
-        // check hive table whether exists in hive database
-        HiveConf hiveConf = new HiveConf();
-        hiveConf.set(HMSProperties.HIVE_METASTORE_URIS,
-                
hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
-        if 
(!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION)))
 {
-            hiveConf.set(HMSProperties.HIVE_VERSION, 
hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION));
-        }
-        HMSCachedClient client = HiveMetadataOps.createCachedClient(hiveConf, 
1, null);
-        if (!client.tableExists(hiveTable.getHiveDb(), 
hiveTable.getHiveTable())) {
-            throw new DdlException(String.format("Table [%s] dose not exist in 
Hive.", hiveTable.getHiveDbTable()));
-        }
-        // check hive table if exists in doris database
-        if (!db.createTableWithLock(hiveTable, false, 
stmt.isSetIfNotExists()).first) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, 
tableName);
-        }
-        LOG.info("successfully create table[{}-{}]", tableName, tableId);
-    }
-
     private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws 
DdlException {
         String tableName = stmt.getTableName();
         List<Column> columns = stmt.getColumns();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index b10bfc39d44..c9d0ce1736b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -110,4 +110,9 @@ public interface HMSCachedClient {
     void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions);
 
     void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData);
+
+    /**
+     * close the connection, eg, to hms
+     */
+    void close();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 9b4540f3b22..243dfb3c24f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -162,6 +162,14 @@ public class HMSExternalCatalog extends ExternalCatalog {
         metadataOps = hiveOps;
     }
 
+    @Override
+    public void onRefresh(boolean invalidCache) {
+        super.onRefresh(invalidCache);
+        if (metadataOps != null) {
+            metadataOps.close();
+        }
+    }
+
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 6ae6b5ebac3..952454e4a95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -32,36 +32,25 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HiveTable;
 import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopUGI;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.thrift.TExprOpcode;
 
-import com.aliyun.datalake.metastore.common.DataLakeConfig;
-import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -76,7 +65,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import shade.doris.hive.org.apache.thrift.TException;
 
 import java.security.PrivilegedExceptionAction;
 import java.time.LocalDateTime;
@@ -148,65 +136,6 @@ public class HiveMetaStoreClientHelper {
         }
     }
 
-    private static IMetaStoreClient getClient(String metaStoreUris) throws 
DdlException {
-        HiveConf hiveConf = new HiveConf();
-        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris);
-        hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
-                String.valueOf(Config.hive_metastore_client_timeout_second));
-        IMetaStoreClient metaStoreClient = null;
-        String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
-        try {
-            if ("dlf".equalsIgnoreCase(type)) {
-                // For aliyun DLF
-                hiveConf.set(DataLakeConfig.CATALOG_CREATE_DEFAULT_DB, 
"false");
-                metaStoreClient = new ProxyMetaStoreClient(hiveConf);
-            } else {
-                metaStoreClient = new HiveMetaStoreClient(hiveConf);
-            }
-        } catch (MetaException e) {
-            LOG.warn("Create HiveMetaStoreClient failed: {}", e.getMessage());
-            throw new DdlException("Create HiveMetaStoreClient failed: " + 
e.getMessage());
-        }
-        return metaStoreClient;
-    }
-
-    public static Table getTable(HiveTable hiveTable) throws DdlException {
-        IMetaStoreClient client = 
getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
-        Table table;
-        try {
-            table = client.getTable(hiveTable.getHiveDb(), 
hiveTable.getHiveTable());
-        } catch (TException e) {
-            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
-            throw new DdlException("Connect hive metastore failed. Error: " + 
e.getMessage());
-        }
-        return table;
-    }
-
-    /**
-     * Get hive table with dbName and tableName.
-     * Only for Hudi.
-     *
-     * @param dbName database name
-     * @param tableName table name
-     * @param metaStoreUris hive metastore uris
-     * @return HiveTable
-     * @throws DdlException when get table from hive metastore failed.
-     */
-    @Deprecated
-    public static Table getTable(String dbName, String tableName, String 
metaStoreUris) throws DdlException {
-        IMetaStoreClient client = getClient(metaStoreUris);
-        Table table;
-        try {
-            table = client.getTable(dbName, tableName);
-        } catch (TException e) {
-            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
-            throw new DdlException("Connect hive metastore failed. Error: " + 
e.getMessage());
-        } finally {
-            client.close();
-        }
-        return table;
-    }
-
     /**
      * Convert Doris expr to Hive expr, only for partition column
      * @param tblName
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 72f19329046..70c61875b8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -81,8 +81,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
         return catalog;
     }
 
-    public static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
-                                                     JdbcClientConfig 
jdbcClientConfig) {
+    private static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
+            JdbcClientConfig jdbcClientConfig) {
         if (hiveConf != null) {
             return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
         }
@@ -266,6 +266,11 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         return listDatabaseNames().contains(dbName);
     }
 
+    @Override
+    public void close() {
+        client.close();
+    }
+
     public List<String> listDatabaseNames() {
         return client.getAllDatabases();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
index 932118001e5..8e41b48bfdd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
@@ -63,6 +63,11 @@ public class PostgreSQLJdbcHMSCachedClient extends 
JdbcHMSCachedClient {
         super(jdbcClientConfig);
     }
 
+    @Override
+    public void close() {
+        // the jdbc connection is used on demand, so we do not need to close 
it.
+    }
+
     @Override
     public Database getDatabase(String dbName) {
         throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 0f74da32018..0acf8893267 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -89,6 +89,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient 
{
     private static final short MAX_LIST_PARTITION_NUM = 
Config.max_hive_list_partition_num;
 
     private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
+    private boolean isClosed = false;
     private final int poolSize;
     private final HiveConf hiveConf;
 
@@ -100,6 +101,21 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         }
         this.hiveConf = hiveConf;
         this.poolSize = poolSize;
+        this.isClosed = false;
+    }
+
+    @Override
+    public void close() {
+        synchronized (clientPool) {
+            this.isClosed = true;
+            while (!clientPool.isEmpty()) {
+                try {
+                    clientPool.poll().close();
+                } catch (Exception e) {
+                    LOG.warn("failed to close thrift client", e);
+                }
+            }
+        }
     }
 
     @Override
@@ -604,7 +620,7 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         @Override
         public void close() throws Exception {
             synchronized (clientPool) {
-                if (throwable != null || clientPool.size() > poolSize) {
+                if (isClosed || throwable != null || clientPool.size() > 
poolSize) {
                     client.close();
                 } else {
                     clientPool.offer(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index c59db7b4b79..7161f48680a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         return catalog;
     }
 
+    @Override
+    public void close() {
+    }
+
     @Override
     public boolean tableExist(String dbName, String tblName) {
         return catalog.tableExists(TableIdentifier.of(dbName, tblName));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index b603b7a3ca7..9426442cebb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -82,4 +82,9 @@ public interface ExternalMetadataOps {
     boolean tableExist(String dbName, String tblName);
 
     boolean databaseExist(String dbName);
+
+    /**
+     * close the connection, eg, to hms
+     */
+    void close();
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
index dd2e8dc2d11..6f969257245 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
@@ -52,6 +52,10 @@ public class TestHMSCachedClient implements HMSCachedClient {
     public Map<String, List<Table>> tables = new HashMap<>();
     public List<Database> dbs = new ArrayList<>();
 
+    @Override
+    public void close() {
+    }
+
     @Override
     public Database getDatabase(String dbName) {
         for (Database db : this.dbs) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to