This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 754b55f5b8b [fix](catalog) close connection on refresh (#35426) 754b55f5b8b is described below commit 754b55f5b8b029bc7fe52f7fc909cdfb6c1f4eda Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu May 30 18:52:51 2024 +0800 [fix](catalog) close connection on refresh (#35426) 1. When refresh the catalog, the related client(such as HMSClient) should be closed, or the connection may be leaked. 2. Remove some unused code related to deprecated hive external table --- .../apache/doris/datasource/InternalCatalog.java | 29 --------- .../doris/datasource/hive/HMSCachedClient.java | 5 ++ .../doris/datasource/hive/HMSExternalCatalog.java | 8 +++ .../datasource/hive/HiveMetaStoreClientHelper.java | 71 ---------------------- .../doris/datasource/hive/HiveMetadataOps.java | 9 ++- .../hive/PostgreSQLJdbcHMSCachedClient.java | 5 ++ .../datasource/hive/ThriftHMSCachedClient.java | 18 +++++- .../datasource/iceberg/IcebergMetadataOps.java | 4 ++ .../datasource/operations/ExternalMetadataOps.java | 5 ++ .../doris/datasource/TestHMSCachedClient.java | 4 ++ 10 files changed, 55 insertions(+), 103 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index c9fd68cfe12..5f589ad1c66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -72,7 +72,6 @@ import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.EsTable; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.HashDistributionInfo; -import org.apache.doris.catalog.HiveTable; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.InfoSchemaDb; import org.apache.doris.catalog.JdbcTable; @@ -138,9 +137,6 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsRepository; -import org.apache.doris.datasource.hive.HMSCachedClient; -import org.apache.doris.datasource.hive.HiveMetadataOps; -import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.event.DropPartitionEvent; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType; @@ -186,7 +182,6 @@ import lombok.Getter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.StopWatch; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -3021,30 +3016,6 @@ public class InternalCatalog implements CatalogIf<Database> { return checkCreateTableResult(tableName, tableId, result); } - private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException { - String tableName = stmt.getTableName(); - List<Column> columns = stmt.getColumns(); - long tableId = Env.getCurrentEnv().getNextId(); - HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties()); - hiveTable.setComment(stmt.getComment()); - // check hive table whether exists in hive database - HiveConf hiveConf = new HiveConf(); - hiveConf.set(HMSProperties.HIVE_METASTORE_URIS, - hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS)); - if (!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION))) { - hiveConf.set(HMSProperties.HIVE_VERSION, hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION)); - } - HMSCachedClient client = HiveMetadataOps.createCachedClient(hiveConf, 1, null); - if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) { - throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable())); - } - // check hive table if exists in doris database - if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table[{}-{}]", tableName, tableId); - } - private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); List<Column> columns = stmt.getColumns(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index b10bfc39d44..c9d0ce1736b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -110,4 +110,9 @@ public interface HMSCachedClient { void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions); void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData); + + /** + * close the connection, eg, to hms + */ + void close(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 9b4540f3b22..243dfb3c24f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -162,6 +162,14 @@ public class HMSExternalCatalog extends ExternalCatalog { metadataOps = hiveOps; } + @Override + public void onRefresh(boolean invalidCache) { + super.onRefresh(invalidCache); + if (metadataOps != null) { + metadataOps.close(); + } + } + @Override public List<String> listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 6ae6b5ebac3..952454e4a95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -32,36 +32,25 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.HiveTable; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.thrift.TExprOpcode; -import com.aliyun.datalake.metastore.common.DataLakeConfig; -import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -76,7 +65,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import shade.doris.hive.org.apache.thrift.TException; import java.security.PrivilegedExceptionAction; import java.time.LocalDateTime; @@ -148,65 +136,6 @@ public class HiveMetaStoreClientHelper { } } - private static IMetaStoreClient getClient(String metaStoreUris) throws DdlException { - HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris); - hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), - String.valueOf(Config.hive_metastore_client_timeout_second)); - IMetaStoreClient metaStoreClient = null; - String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE); - try { - if ("dlf".equalsIgnoreCase(type)) { - // For aliyun DLF - hiveConf.set(DataLakeConfig.CATALOG_CREATE_DEFAULT_DB, "false"); - metaStoreClient = new ProxyMetaStoreClient(hiveConf); - } else { - metaStoreClient = new HiveMetaStoreClient(hiveConf); - } - } catch (MetaException e) { - LOG.warn("Create HiveMetaStoreClient failed: {}", e.getMessage()); - throw new DdlException("Create HiveMetaStoreClient failed: " + e.getMessage()); - } - return metaStoreClient; - } - - public static Table getTable(HiveTable hiveTable) throws DdlException { - IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS)); - Table table; - try { - table = client.getTable(hiveTable.getHiveDb(), hiveTable.getHiveTable()); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage()); - } - return table; - } - - /** - * Get hive table with dbName and tableName. - * Only for Hudi. - * - * @param dbName database name - * @param tableName table name - * @param metaStoreUris hive metastore uris - * @return HiveTable - * @throws DdlException when get table from hive metastore failed. - */ - @Deprecated - public static Table getTable(String dbName, String tableName, String metaStoreUris) throws DdlException { - IMetaStoreClient client = getClient(metaStoreUris); - Table table; - try { - table = client.getTable(dbName, tableName); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage()); - } finally { - client.close(); - } - return table; - } - /** * Convert Doris expr to Hive expr, only for partition column * @param tblName diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 72f19329046..70c61875b8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -81,8 +81,8 @@ public class HiveMetadataOps implements ExternalMetadataOps { return catalog; } - public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, - JdbcClientConfig jdbcClientConfig) { + private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, + JdbcClientConfig jdbcClientConfig) { if (hiveConf != null) { return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); } @@ -266,6 +266,11 @@ public class HiveMetadataOps implements ExternalMetadataOps { return listDatabaseNames().contains(dbName); } + @Override + public void close() { + client.close(); + } + public List<String> listDatabaseNames() { return client.getAllDatabases(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index 932118001e5..8e41b48bfdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -63,6 +63,11 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient { super(jdbcClientConfig); } + @Override + public void close() { + // the jdbc connection is used on demand, so we do not need to close it. + } + @Override public Database getDatabase(String dbName) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 0f74da32018..0acf8893267 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -89,6 +89,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { private static final short MAX_LIST_PARTITION_NUM = Config.max_hive_list_partition_num; private Queue<ThriftHMSClient> clientPool = new LinkedList<>(); + private boolean isClosed = false; private final int poolSize; private final HiveConf hiveConf; @@ -100,6 +101,21 @@ public class ThriftHMSCachedClient implements HMSCachedClient { } this.hiveConf = hiveConf; this.poolSize = poolSize; + this.isClosed = false; + } + + @Override + public void close() { + synchronized (clientPool) { + this.isClosed = true; + while (!clientPool.isEmpty()) { + try { + clientPool.poll().close(); + } catch (Exception e) { + LOG.warn("failed to close thrift client", e); + } + } + } } @Override @@ -604,7 +620,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { @Override public void close() throws Exception { synchronized (clientPool) { - if (throwable != null || clientPool.size() > poolSize) { + if (isClosed || throwable != null || clientPool.size() > poolSize) { client.close(); } else { clientPool.offer(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index c59db7b4b79..7161f48680a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -64,6 +64,10 @@ public class IcebergMetadataOps implements ExternalMetadataOps { return catalog; } + @Override + public void close() { + } + @Override public boolean tableExist(String dbName, String tblName) { return catalog.tableExists(TableIdentifier.of(dbName, tblName)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java index b603b7a3ca7..9426442cebb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java @@ -82,4 +82,9 @@ public interface ExternalMetadataOps { boolean tableExist(String dbName, String tblName); boolean databaseExist(String dbName); + + /** + * close the connection, eg, to hms + */ + void close(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java index dd2e8dc2d11..6f969257245 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java @@ -52,6 +52,10 @@ public class TestHMSCachedClient implements HMSCachedClient { public Map<String, List<Table>> tables = new HashMap<>(); public List<Database> dbs = new ArrayList<>(); + @Override + public void close() { + } + @Override public Database getDatabase(String dbName) { for (Database db : this.dbs) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org