This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7eed5a292c [feature-wip](multi-catalog) Support hive partition cache (#14134) 7eed5a292c is described below commit 7eed5a292c71a5ff67dce8333e58e4aa49ddb276 Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Nov 14 14:12:40 2022 +0800 [feature-wip](multi-catalog) Support hive partition cache (#14134) --- be/src/exec/text_converter.hpp | 2 +- docs/en/docs/admin-manual/config/fe-config.md | 52 +++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 43 ++- .../maint-monitor/monitor-metrics/metrics.md | 5 + .../java/org/apache/doris/analysis/ColumnDef.java | 4 +- .../main/java/org/apache/doris/catalog/Env.java | 8 + .../org/apache/doris/catalog/PartitionKey.java | 21 +- .../org/apache/doris/catalog/RefreshManager.java | 3 +- .../doris/catalog/external/EsExternalDatabase.java | 24 +- .../doris/catalog/external/EsExternalTable.java | 82 +--- .../doris/catalog/external/ExternalDatabase.java | 27 +- .../doris/catalog/external/ExternalTable.java | 43 +-- .../catalog/external/HMSExternalDatabase.java | 24 +- .../doris/catalog/external/HMSExternalTable.java | 136 ++----- .../main/java/org/apache/doris/common/Config.java | 28 ++ .../apache/doris/datasource/CacheException.java | 24 ++ .../org/apache/doris/datasource/CatalogMgr.java | 25 +- .../apache/doris/datasource/EsExternalCatalog.java | 7 + .../apache/doris/datasource/ExternalCatalog.java | 7 +- .../doris/datasource/ExternalMetaCacheMgr.java | 95 +++++ .../doris/datasource/ExternalSchemaCache.java | 131 +++++++ .../doris/datasource/HMSClientException.java | 24 ++ .../doris/datasource/HMSExternalCatalog.java | 15 + .../datasource/PooledHiveMetaStoreClient.java | 42 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 423 +++++++++++++++++++++ .../doris/datasource/hive/HivePartition.java | 46 +++ .../java/org/apache/doris/persist/EditLog.java | 8 +- .../org/apache/doris/persist/OperationType.java | 1 + .../java/org/apache/doris/planner/ScanNode.java | 1 - .../planner/external/ExternalFileScanNode.java | 12 +- .../doris/planner/external/HiveScanProvider.java | 151 ++++---- .../doris/planner/external/HudiScanProvider.java | 7 +- .../planner/external/IcebergScanProvider.java | 9 +- .../doris/planner/external/QueryScanProvider.java | 14 +- .../java/org/apache/doris/qe/StmtExecutor.java | 2 +- 35 files changed, 1155 insertions(+), 391 deletions(-) diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 4eaa065947..45a7a4e570 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -206,7 +206,6 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, } StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; - // Parse the raw-text data. Translate the text string to internal format. switch (slot_desc->type().type) { case TYPE_HLL: { @@ -314,6 +313,7 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, size_t size = nullable_column->get_null_map_data().size(); doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data(); null_map_data[size - 1] = 1; + nullable_column->get_nested_column().insert_default(); } return false; } diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 66ae4c75d7..71fc393f72 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2258,3 +2258,55 @@ Default: 1 Is it possible to configure dynamically: true Whether it is a configuration item unique to the Master FE node: true + +### `max_replica_count_when_schema_change` + +The maximum number of replicas allowed when OlapTable is doing schema changes. Too many replicas will lead to FE OOM. + +Default: 100000 + +Is it possible to configure dynamically: true + +Whether it is a configuration item unique to the Master FE node: true + +### `max_hive_partition_cache_num` + +The maximum number of caches for the hive partition. + +Default: 100000 + +Is it possible to dynamically configure: false + +Is it a configuration item unique to the Master FE node: false + +### `max_external_file_cache_num` + +Maximum number of file cache to use for external external tables. + +Default: 100000 + +Is it possible to dynamically configure: false + +Is it a configuration item unique to the Master FE node: false + +### `max_external_schema_cache_num` + +Maximum number of schema cache to use for external external tables. + +Default: 10000 + +Is it possible to dynamically configure: false + +Is it a configuration item unique to the Master FE node: false + +### `external_cache_expire_time_minutes_after_access` + +Set how long the data in the cache expires after the last access. The unit is minutes. +Applies to External Schema Cache as well as Hive Partition Cache. + +Default: 1440 + +Is it possible to dynamically configure: false + +Is it a configuration item unique to the Master FE node: false + diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 957f87bcd6..dd2f238032 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2315,7 +2315,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 是否为 Master FE 节点独有的配置项:true -### max_replica_count_when_schema_change +### `max_replica_count_when_schema_change` OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。 @@ -2324,3 +2324,44 @@ OlapTable在做schema change时,允许的最大副本数,副本数过大会 是否可以动态配置:true 是否为 Master FE 节点独有的配置项:true + +### `max_hive_partition_cache_num` + +hive partition 的最大缓存数量。 + +默认值:100000 + +是否可以动态配置:false + +是否为 Master FE 节点独有的配置项:false + +### `max_external_file_cache_num` + +用于 external 外部表的最大文件缓存数量。 + +默认值:100000 + +是否可以动态配置:false + +是否为 Master FE 节点独有的配置项:false + +### `max_external_schema_cache_num` + +用于 external 外部表的最大 schema 缓存数量。 + +默认值:10000 + +是否可以动态配置:false + +是否为 Master FE 节点独有的配置项:false + +### `external_cache_expire_time_minutes_after_access` + +设置缓存中的数据,在最后一次访问后多久失效。单位为分钟。 +适用于 External Schema Cache 以及 Hive Partition Cache. + +默认值:1440 + +是否可以动态配置:false + +是否为 Master FE 节点独有的配置项:false diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 206f021e2e..96ede4e7d5 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -143,6 +143,11 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test 当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 | |`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如 {method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| | |`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如 {method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| | +|`doris_fe_external_schema_cache` | {catalog="hive"} | Num | 指定 External Catalog 对应的 schema cache 的数量 ||| +|`doris_fe_hive_meta_cache` | {catalog="hive"} | Num | ||| +| | `{type="partition_value"}` | Num | 指定 External Hive Metastore Catalog 对应的 partition value cache 的数量 ||| +| | `{type="partition"}` | Num | 指定 External Hive Metastore Catalog 对应的 partition cache 的数量 ||| +| | `{type="file"}` | Num | 指定 External Hive Metastore Catalog 对应的 file cache 的数量 ||| ### JVM 监控 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java index 87267e69d2..36b556c623 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java @@ -199,14 +199,14 @@ public class ColumnDef { } FeNameFormat.checkColumnName(name); - // When string type length is not assigned, it need to be assigned to 1. + // When string type length is not assigned, it needs to be assigned to 1. if (typeDef.getType().isScalarType()) { final ScalarType targetType = (ScalarType) typeDef.getType(); if (targetType.getPrimitiveType().isStringType() && !targetType.isLengthSet()) { if (targetType.getPrimitiveType() != PrimitiveType.STRING) { targetType.setLength(1); } else { - // alway set text length MAX_STRING_LENGTH + // always set text length MAX_STRING_LENGTH targetType.setLength(ScalarType.MAX_STRING_LENGTH); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 022f86cbf0..8ce6d7a346 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -124,6 +124,7 @@ import org.apache.doris.consistency.ConsistencyChecker; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; +import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.deploy.DeployManager; import org.apache.doris.deploy.impl.AmbariDeployManager; @@ -450,6 +451,8 @@ public class Env { private final StatisticsCache statisticsCache; + private ExternalMetaCacheMgr extMetaCacheMgr; + public List<Frontend> getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -515,6 +518,10 @@ public class Env { return mtmvJobManager; } + public ExternalMetaCacheMgr getExtMetaCacheMgr() { + return extMetaCacheMgr; + } + public CatalogIf getCurrentCatalog() { ConnectContext ctx = ConnectContext.get(); if (ctx == null) { @@ -646,6 +653,7 @@ public class Env { this.mtmvJobManager = new MTMVJobManager(); this.analysisJobScheduler = new AnalysisJobScheduler(); this.statisticsCache = new StatisticsCache(); + this.extMetaCacheMgr = new ExternalMetaCacheMgr(); } public static void destroyCheckpoint() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index d8bcb961e2..f773209cd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -40,6 +40,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.stream.Collectors; import java.util.zip.CRC32; public class PartitionKey implements Comparable<PartitionKey>, Writable { @@ -86,7 +87,7 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { return partitionKey; } - public static PartitionKey createListPartitionKey(List<PartitionValue> values, List<Column> columns) + public static PartitionKey createListPartitionKeyWithTypes(List<PartitionValue> values, List<Type> types) throws AnalysisException { // for multi list partition: // @@ -108,18 +109,24 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { // PARTITION p6 VALUES IN ("26") // ) // - Preconditions.checkArgument(values.size() == columns.size(), - "in value size[" + values.size() + "] is not equal to partition column size[" + columns.size() + "]."); + Preconditions.checkArgument(values.size() == types.size(), + "in value size[" + values.size() + "] is not equal to partition column size[" + types.size() + "]."); PartitionKey partitionKey = new PartitionKey(); for (int i = 0; i < values.size(); i++) { - partitionKey.keys.add(values.get(i).getValue(Type.fromPrimitiveType(columns.get(i).getDataType()))); - partitionKey.types.add(columns.get(i).getDataType()); + partitionKey.keys.add(values.get(i).getValue(types.get(i))); + partitionKey.types.add(types.get(i).getPrimitiveType()); } return partitionKey; } + public static PartitionKey createListPartitionKey(List<PartitionValue> values, List<Column> columns) + throws AnalysisException { + List<Type> types = columns.stream().map(c -> c.getType()).collect(Collectors.toList()); + return createListPartitionKeyWithTypes(values, types); + } + public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) { keys.add(keyValue); types.add(keyType); @@ -163,6 +170,10 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { return true; } + public List<String> getPartitionValuesAsStringList() { + return keys.stream().map(k -> k.getStringValue()).collect(Collectors.toList()); + } + public static int compareLiteralExpr(LiteralExpr key1, LiteralExpr key2) { int ret = 0; if (key1 instanceof MaxLiteral || key2 instanceof MaxLiteral) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index bb37a9b1b9..439c8c73c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -23,7 +23,6 @@ import org.apache.doris.analysis.RefreshDbStmt; import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.external.ExternalDatabase; -import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; @@ -160,7 +159,7 @@ public class RefreshManager { if (table == null) { throw new DdlException("Table " + tableName + " does not exist in db " + dbName); } - ((ExternalTable) table).setUnInitialized(); + Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId(), dbName, tableName); ExternalObjectLog log = new ExternalObjectLog(); log.setCatalogId(catalog.getId()); log.setDbId(db.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java index 784c0b8bdc..e9b3ce354b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java @@ -18,12 +18,10 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Env; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.persist.gson.GsonPostProcessable; -import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -65,7 +63,6 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implem Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { EsExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - table.setUnInitialized(); tmpTableNameToId.put(table.getName(), table.getId()); tmpIdToTbl.put(table.getId(), table); } @@ -86,24 +83,8 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implem } } - public synchronized void makeSureInitialized() { - if (!initialized) { - if (!Env.getCurrentEnv().isMaster()) { - // Forward to master and wait the journal to replay. - MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); - try { - remoteExecutor.forward(extCatalog.getId(), id, -1); - } catch (Exception e) { - Util.logAndThrowRuntimeException(LOG, - String.format("failed to forward init external db %s operation to master", name), e); - } - return; - } - init(); - } - } - - private void init() { + @Override + protected void init() { InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); initDatabaseLog.setType(InitDatabaseLog.Type.ES); initDatabaseLog.setCatalogId(extCatalog.getId()); @@ -118,7 +99,6 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implem tblId = tableNameToId.get(tableName); tmpTableNameToId.put(tableName, tblId); EsExternalTable table = idToTbl.get(tblId); - table.setUnInitialized(); tmpIdToTbl.put(tblId, table); initDatabaseLog.addRefreshTable(tblId); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index 5cbafdcd15..eb8c8972b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -18,13 +18,8 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EsTable; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.EsExternalCatalog; -import org.apache.doris.datasource.InitTableLog; -import org.apache.doris.external.elasticsearch.EsUtil; -import org.apache.doris.qe.MasterCatalogExecutor; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -54,84 +49,13 @@ public class EsExternalTable extends ExternalTable { super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE); } - public synchronized void makeSureInitialized() { - if (!initialized) { - if (!Env.getCurrentEnv().isMaster()) { - fullSchema = null; - // Forward to master and wait the journal to replay. - MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); - try { - remoteExecutor.forward(catalog.getId(), catalog.getDbNullable(dbName).getId(), id); - } catch (Exception e) { - Util.logAndThrowRuntimeException(LOG, - String.format("failed to forward init external table %s operation to master", name), e); - } - } else { - init(); - } - } if (!objectCreated) { esTable = toEsTable(); objectCreated = true; } } - private void init() { - boolean schemaChanged = false; - List<Column> tmpSchema = EsUtil.genColumnsFromEs( - ((EsExternalCatalog) catalog).getEsRestClient(), name, null); - if (fullSchema == null || fullSchema.size() != tmpSchema.size()) { - schemaChanged = true; - } else { - for (int i = 0; i < fullSchema.size(); i++) { - if (!fullSchema.get(i).equals(tmpSchema.get(i))) { - schemaChanged = true; - break; - } - } - } - if (schemaChanged) { - timestamp = System.currentTimeMillis(); - fullSchema = tmpSchema; - esTable = toEsTable(); - } - initialized = true; - InitTableLog initTableLog = new InitTableLog(); - initTableLog.setCatalogId(catalog.getId()); - initTableLog.setDbId(catalog.getDbNameToId().get(dbName)); - initTableLog.setTableId(id); - initTableLog.setSchema(fullSchema); - Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog); - } - - @Override - public List<Column> getFullSchema() { - makeSureInitialized(); - return fullSchema; - } - - @Override - public List<Column> getBaseSchema() { - return getFullSchema(); - } - - @Override - public List<Column> getBaseSchema(boolean full) { - return getFullSchema(); - } - - @Override - public Column getColumn(String name) { - makeSureInitialized(); - for (Column column : fullSchema) { - if (name.equals(column.getName())) { - return column; - } - } - return null; - } - public EsTable getEsTable() { makeSureInitialized(); return esTable; @@ -151,16 +75,18 @@ public class EsExternalTable extends ExternalTable { @Override public TTableDescriptor toThrift() { + List<Column> schema = getFullSchema(); TEsTable tEsTable = new TEsTable(); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, schema.size(), 0, getName(), ""); tTableDescriptor.setEsTable(tEsTable); return tTableDescriptor; } private EsTable toEsTable() { + List<Column> schema = getFullSchema(); EsExternalCatalog esCatalog = (EsExternalCatalog) catalog; - EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, TableType.ES_EXTERNAL_TABLE); + EsTable esTable = new EsTable(this.id, this.name, schema, TableType.ES_EXTERNAL_TABLE); esTable.setIndexName(name); esTable.setClient(esCatalog.getEsRestClient()); esTable.setUserName(esCatalog.getUsername()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index c5fa7b9875..87d1cccf5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -19,15 +19,18 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.DatabaseProperty; +import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterCatalogExecutor; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang.NotImplementedException; @@ -87,7 +90,8 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, this.extCatalog = extCatalog; } - public void setTableExtCatalog(ExternalCatalog extCatalog) {} + public void setTableExtCatalog(ExternalCatalog extCatalog) { + } public void setUnInitialized() { this.initialized = false; @@ -97,7 +101,26 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, return initialized; } - public void makeSureInitialized() {} + public final synchronized void makeSureInitialized() { + if (!initialized) { + if (!Env.getCurrentEnv().isMaster()) { + // Forward to master and wait the journal to replay. + MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); + try { + remoteExecutor.forward(extCatalog.getId(), id, -1); + } catch (Exception e) { + Util.logAndThrowRuntimeException(LOG, + String.format("failed to forward init external db %s operation to master", name), e); + } + return; + } + init(); + } + } + + protected void init() { + throw new NotImplementedException(); + } public T getTableForReplay(long tableId) { throw new NotImplementedException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index f713de7419..36189c9a00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -19,12 +19,14 @@ package org.apache.doris.catalog.external; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TTableDescriptor; @@ -56,26 +58,20 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { protected String name; @SerializedName(value = "type") protected TableType type = null; - @SerializedName(value = "fullSchema") - protected volatile List<Column> fullSchema = null; - @SerializedName(value = "initialized") - protected boolean initialized = false; @SerializedName(value = "timestamp") protected long timestamp; - - protected ExternalCatalog catalog; @SerializedName(value = "dbName") protected String dbName; + protected boolean objectCreated = false; + protected ExternalCatalog catalog; protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); /** * No args constructor for persist. */ public ExternalTable() { - this.initialized = false; this.objectCreated = false; - this.fullSchema = null; } /** @@ -93,9 +89,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { this.catalog = catalog; this.dbName = dbName; this.type = type; - this.initialized = false; this.objectCreated = false; - this.fullSchema = null; } public void setCatalog(ExternalCatalog catalog) { @@ -106,17 +100,10 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return false; } - public void setUnInitialized() { - this.initialized = false; - } - - public void replayInitTable(List<Column> schema) { - fullSchema = schema; - initialized = true; + public void makeSureInitialized() { + throw new NotImplementedException(); } - public void makeSureInitialized() {} - @Override public void readLock() { this.rwLock.readLock().lock(); @@ -226,27 +213,35 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public List<Column> getFullSchema() { - throw new NotImplementedException(); + makeSureInitialized(); + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + return cache.getSchema(dbName, name); } @Override public List<Column> getBaseSchema() { - throw new NotImplementedException(); + return getFullSchema(); } @Override public List<Column> getBaseSchema(boolean full) { - throw new NotImplementedException(); + return getFullSchema(); } + @Override public void setNewFullSchema(List<Column> newSchema) { - this.fullSchema = newSchema; } @Override public Column getColumn(String name) { - throw new NotImplementedException(); + List<Column> schema = getFullSchema(); + for (Column column : schema) { + if (name.equals(column.getName())) { + return column; + } + } + return null; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index ee7c589019..decef86caa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -19,12 +19,10 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.persist.gson.GsonPostProcessable; -import org.apache.doris.qe.MasterCatalogExecutor; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -68,7 +66,6 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { HMSExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); - table.setUnInitialized(); tmpTableNameToId.put(table.getName(), table.getId()); tmpIdToTbl.put(table.getId(), table); } @@ -89,24 +86,8 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl } } - public synchronized void makeSureInitialized() { - if (!initialized) { - if (!Env.getCurrentEnv().isMaster()) { - // Forward to master and wait the journal to replay. - MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); - try { - remoteExecutor.forward(extCatalog.getId(), id, -1); - } catch (Exception e) { - Util.logAndThrowRuntimeException(LOG, - String.format("failed to forward init external db %s operation to master", name), e); - } - return; - } - init(); - } - } - - private void init() { + @Override + protected void init() { InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); initDatabaseLog.setType(InitDatabaseLog.Type.HMS); initDatabaseLog.setCatalogId(extCatalog.getId()); @@ -121,7 +102,6 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl tblId = tableNameToId.get(tableName); tmpTableNameToId.put(tableName, tblId); HMSExternalTable table = idToTbl.get(tblId); - table.setUnInitialized(); tmpIdToTbl.put(tblId, table); initDatabaseLog.addRefreshTable(tblId); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 3d8b2e5778..6e12f7e30a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,15 +18,9 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.common.DdlException; +import org.apache.doris.catalog.Type; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; -import org.apache.doris.datasource.InitTableLog; -import org.apache.doris.datasource.PooledHiveMetaStoreClient; -import org.apache.doris.qe.MasterCatalogExecutor; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -34,9 +28,6 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Hive metastore external table. @@ -61,6 +53,8 @@ public class HMSExternalTable extends ExternalTable { } private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + private List<Column> partitionColumns; + private DLAType dlaType = DLAType.UNKNOWN; public enum DLAType { @@ -106,21 +100,6 @@ public class HMSExternalTable extends ExternalTable { } objectCreated = true; } - if (!initialized) { - if (!Env.getCurrentEnv().isMaster()) { - fullSchema = null; - // Forward to master and wait the journal to replay. - MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); - try { - remoteExecutor.forward(catalog.getId(), catalog.getDbNullable(dbName).getId(), id); - } catch (Exception e) { - Util.logAndThrowRuntimeException(LOG, - String.format("failed to forward init external table %s operation to master", name), e); - } - return; - } - init(); - } } /** @@ -160,50 +139,10 @@ public class HMSExternalTable extends ExternalTable { * Now we only support three file input format hive tables: parquet/orc/text. And they must be managed_table. */ private boolean supportedHiveTable() { - // boolean isManagedTable = remoteTable.getTableType().equalsIgnoreCase("MANAGED_TABLE"); - // TODO: try to support EXTERNAL_TABLE - boolean isManagedTable = true; String inputFileFormat = remoteTable.getSd().getInputFormat(); boolean supportedFileFormat = inputFileFormat != null && SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat); LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat); - return isManagedTable && supportedFileFormat; - } - - private void init() { - boolean schemaChanged = false; - List<Column> tmpSchema = Lists.newArrayList(); - if (dlaType.equals(DLAType.UNKNOWN)) { - schemaChanged = true; - } else { - List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name); - for (FieldSchema field : schema) { - int columnId = (int) Env.getCurrentEnv().getNextId(); - tmpSchema.add(new Column(field.getName(), - HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, - true, null, field.getComment(), true, null, columnId)); - } - if (fullSchema == null || fullSchema.size() != tmpSchema.size()) { - schemaChanged = true; - } else { - for (int i = 0; i < fullSchema.size(); i++) { - if (!fullSchema.get(i).equals(tmpSchema.get(i))) { - schemaChanged = true; - break; - } - } - } - } - if (schemaChanged) { - timestamp = System.currentTimeMillis(); - fullSchema = tmpSchema; - } - initialized = true; - InitTableLog initTableLog = new InitTableLog(); - initTableLog.setCatalogId(catalog.getId()); - initTableLog.setDbId(catalog.getDbNameToId().get(dbName)); - initTableLog.setTableId(id); - initTableLog.setSchema(fullSchema); - Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog); + return supportedFileFormat; } /** @@ -220,36 +159,25 @@ public class HMSExternalTable extends ExternalTable { return remoteTable; } - @Override - public boolean isView() { - return remoteTable.isSetViewOriginalText() || remoteTable.isSetViewExpandedText(); - } - - @Override - public List<Column> getFullSchema() { + public List<Type> getPartitionColumnTypes() { makeSureInitialized(); - return fullSchema; + initPartitionColumns(); + return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); } - @Override - public List<Column> getBaseSchema() { - return getFullSchema(); + public List<Column> getPartitionColumns() { + makeSureInitialized(); + initPartitionColumns(); + return partitionColumns; } - @Override - public List<Column> getBaseSchema(boolean full) { - return getFullSchema(); + public List<String> getPartitionColumnNames() { + return getPartitionColumns().stream().map(c -> c.getName()).collect(Collectors.toList()); } @Override - public Column getColumn(String name) { - makeSureInitialized(); - for (Column column : fullSchema) { - if (name.equals(column.getName())) { - return column; - } - } - return null; + public boolean isView() { + return remoteTable.isSetViewOriginalText() || remoteTable.isSetViewExpandedText(); } @Override @@ -321,8 +249,9 @@ public class HMSExternalTable extends ExternalTable { @Override public TTableDescriptor toThrift() { + List<Column> schema = getFullSchema(); THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, fullSchema.size(), 0, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0, getName(), dbName); tTableDescriptor.setHiveTable(tHiveTable); return tTableDescriptor; @@ -340,12 +269,29 @@ public class HMSExternalTable extends ExternalTable { return catalog.getCatalogProperty().getS3Properties(); } - public List<Partition> getHivePartitions(ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { - List<Partition> hivePartitions = Lists.newArrayList(); - PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient(); - client.listPartitionsByExpr(remoteTable.getDbName(), remoteTable.getTableName(), - SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), hivePartitions); - return hivePartitions; + private void initPartitionColumns() { + if (partitionColumns != null) { + return; + } + synchronized (this) { + if (partitionColumns != null) { + return; + } + Set<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toSet()); + partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + for (String partitionKey : partitionKeys) { + // Do not use "getColumn()", which will cause dead loop + List<Column> schema = getFullSchema(); + for (Column column : schema) { + if (partitionKey.equals(column.getName())) { + partitionColumns.add(column); + break; + } + } + } + LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index e22913fe52..f0c02c879c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1888,5 +1888,33 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static long max_replica_count_when_schema_change = 100000; + + /** + * Max cache num of hive partition. + * Decrease this value if FE's memory is small + */ + @ConfField(mutable = false, masterOnly = false) + public static long max_hive_partition_cache_num = 100000; + + /** + * Max cache num of external catalog's file + * Decrease this value if FE's memory is small + */ + @ConfField(mutable = false, masterOnly = false) + public static long max_external_file_cache_num = 100000; + + /** + * Max cache num of external table's schema + * Decrease this value if FE's memory is small + */ + @ConfField(mutable = false, masterOnly = false) + public static long max_external_schema_cache_num = 10000; + + /** + * The expiration time of a cache object after last access of it. + * For external schema cache and hive meta cache. + */ + @ConfField(mutable = false, masterOnly = false) + public static long external_cache_expire_time_minutes_after_access = 24 * 60; // 1 day } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java new file mode 100644 index 0000000000..bbb5ec80dc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +public class CacheException extends RuntimeException { + public CacheException(String format, Throwable cause, Object... msg) { + super(String.format(format, msg), cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index e1687b0f7f..39ec822f6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -95,6 +95,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { CatalogIf catalog = idToCatalog.remove(catalogId); if (catalog != null) { nameToCatalog.remove(catalog.getName()); + Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getName()); } return catalog; } @@ -423,16 +424,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { db.replayInitDb(log, catalog); } - public void replayInitExternalTable(InitTableLog log) { - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - Preconditions.checkArgument(catalog != null); - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - Preconditions.checkArgument(db != null); - ExternalTable table = db.getTableForReplay(log.getTableId()); - Preconditions.checkArgument(table != null); - table.replayInitTable(log.getSchema()); - } - public void replayRefreshExternalDb(ExternalObjectLog log) { writeLock(); try { @@ -445,15 +436,11 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } public void replayRefreshExternalTable(ExternalObjectLog log) { - writeLock(); - try { - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - ExternalTable table = db.getTableForReplay(log.getTableId()); - table.setUnInitialized(); - } finally { - writeUnlock(); - } + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + ExternalTable table = db.getTableForReplay(log.getTableId()); + Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog) + .invalidateCache(db.getFullName(), table.getName()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java index 29123395ed..c5a410eb3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.ExternalDatabase; @@ -190,4 +191,10 @@ public class EsExternalCatalog extends ExternalCatalog { super.gsonPostProcess(); setProperties(this.catalogProperty.getProperties()); } + + @Override + public List<Column> getSchema(String dbName, String tblName) { + makeSureInitialized(); + return EsUtil.genColumnsFromEs(getEsRestClient(), tblName, null); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index ff82a4dda0..0663fed2e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.ExternalDatabase; @@ -62,14 +63,14 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr protected CatalogProperty catalogProperty = new CatalogProperty(); @SerializedName(value = "initialized") private boolean initialized = false; - - // Cache of db name to db id @SerializedName(value = "idToDb") protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap(); // db name does not contains "default_cluster" protected Map<String, Long> dbNameToId = Maps.newConcurrentMap(); private boolean objectCreated = false; + private ExternalSchemaCache schemaCache; + /** * @return names of database in this catalog. */ @@ -135,6 +136,8 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr throw new NotImplementedException(); } + public abstract List<Column> getSchema(String dbName, String tblName); + @Override public long getId() { return id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java new file mode 100644 index 0000000000..eb9123f814 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Cache meta of external catalog + * 1. Meta for hive meta store, mainly for partition. + * 2. Table Schema cahce. + */ +public class ExternalMetaCacheMgr { + private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class); + + // catalog id -> HiveMetaStoreCache + private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap(); + // catalog id -> table schema cache + private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap(); + private Executor executor; + + public ExternalMetaCacheMgr() { + executor = ThreadPoolManager.newDaemonCacheThreadPool(10, "ExternalMetaCacheMgr", false); + } + + public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { + HiveMetaStoreCache cache = cacheMap.get(catalog.getId()); + if (cache == null) { + synchronized (cacheMap) { + if (!cacheMap.containsKey(catalog.getId())) { + cacheMap.put(catalog.getId(), new HiveMetaStoreCache(catalog, executor)); + } + cache = cacheMap.get(catalog.getId()); + } + } + return cache; + } + + public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) { + ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId()); + if (cache == null) { + synchronized (schemaCacheMap) { + if (!schemaCacheMap.containsKey(catalog.getId())) { + schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, executor)); + } + cache = schemaCacheMap.get(catalog.getId()); + } + } + return cache; + } + + public void removeCache(String catalogId) { + if (cacheMap.remove(catalogId) != null) { + LOG.info("remove hive metastore cache for catalog {}" + catalogId); + } + if (schemaCacheMap.remove(catalogId) != null) { + LOG.info("remove schema cache for catalog {}" + catalogId); + } + } + + public void removeCache(long catalogId, String dbName, String tblName) { + ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId); + if (schemaCache != null) { + schemaCache.invalidateCache(dbName, tblName); + LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, tblName, catalogId); + } + HiveMetaStoreCache metaCache = cacheMap.get(catalogId); + if (metaCache != null) { + metaCache.invalidateCache(dbName, tblName); + LOG.debug("invalid meta cache for {}.{} in catalog {}", dbName, tblName, catalogId); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java new file mode 100644 index 0000000000..718525ae6a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.Config; +import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.Metric; +import org.apache.doris.metric.MetricLabel; +import org.apache.doris.metric.MetricRepo; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import lombok.Data; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +// The schema cache for external table +public class ExternalSchemaCache { + private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class); + private ExternalCatalog catalog; + + private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache; + + public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) { + this.catalog = catalog; + init(executor); + initMetrics(); + } + + private void init(Executor executor) { + schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(CacheLoader.asyncReloading(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() { + @Override + public ImmutableList<Column> load(SchemaCacheKey key) throws Exception { + return loadSchema(key); + } + }, executor)); + } + + private void initMetrics() { + // schema cache + GaugeMetric<Long> schemaCacheGauge = new GaugeMetric<Long>("external_schema_cache", + Metric.MetricUnit.NOUNIT, "external schema cache number") { + @Override + public Long getValue() { + return schemaCache.size(); + } + }; + schemaCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge); + } + + private ImmutableList<Column> loadSchema(SchemaCacheKey key) { + ImmutableList<Column> schema = ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName)); + if (LOG.isDebugEnabled()) { + LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); + } + return schema; + } + + public List<Column> getSchema(String dbName, String tblName) { + SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + try { + return schemaCache.get(key); + } catch (ExecutionException e) { + throw new CacheException("failed to get schema for %s in catalog %s", e, key, catalog.getName()); + } + } + + public void invalidateCache(String dbName, String tblName) { + SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + schemaCache.invalidate(key); + } + + @Data + public static class SchemaCacheKey { + private String dbName; + private String tblName; + + public SchemaCacheKey(String dbName, String tblName) { + this.dbName = dbName; + this.tblName = tblName; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SchemaCacheKey)) { + return false; + } + return dbName.equals(((SchemaCacheKey) obj).dbName) && tblName.equals(((SchemaCacheKey) obj).tblName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tblName); + } + + @Override + public String toString() { + return "SchemaCacheKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java new file mode 100644 index 0000000000..fa2652b867 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +public class HMSClientException extends RuntimeException { + public HMSClientException(String format, Throwable cause, Object... msg) { + super(String.format(format, msg), cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index f348e93787..fdb2ef2981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.ExternalDatabase; @@ -26,6 +27,7 @@ import org.apache.doris.cluster.ClusterNamespace; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -159,4 +161,17 @@ public class HMSExternalCatalog extends ExternalCatalog { makeSureInitialized(); return client; } + + @Override + public List<Column> getSchema(String dbName, String tblName) { + makeSureInitialized(); + List<FieldSchema> schema = getClient().getSchema(dbName, tblName); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size()); + for (FieldSchema field : schema) { + tmpSchema.add(new Column(field.getName(), + HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, + true, null, field.getComment(), true, null, -1)); + } + return tmpSchema; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java index c2f3567f56..be7e54eba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java @@ -40,13 +40,15 @@ import java.util.List; import java.util.Queue; /** - * A hive metastore client pool for a specific hive conf. + * A hive metastore client pool for a specific catalog with hive configuration. */ public class PooledHiveMetaStoreClient { private static final Logger LOG = LogManager.getLogger(PooledHiveMetaStoreClient.class); - private Queue<CachedClient> clientPool = new LinkedList<>(); private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null; + private static final short MAX_LIST_PARTITION_NUM = 10000; + + private Queue<CachedClient> clientPool = new LinkedList<>(); private final int poolSize; private final HiveConf hiveConf; @@ -62,7 +64,7 @@ public class PooledHiveMetaStoreClient { try (CachedClient client = getClient()) { return client.client.getAllDatabases(); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to get all database from hms client", e); } } @@ -70,7 +72,7 @@ public class PooledHiveMetaStoreClient { try (CachedClient client = getClient()) { return client.client.getAllTables(dbName); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to get all tables for db %s", e, dbName); } } @@ -78,17 +80,33 @@ public class PooledHiveMetaStoreClient { try (CachedClient client = getClient()) { return client.client.tableExists(dbName, tblName); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName); + } + } + + public List<String> listPartitionNames(String dbName, String tblName) { + try (CachedClient client = getClient()) { + return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); + } catch (Exception e) { + throw new HMSClientException("failed to list partition names for table %s in db %s", e, tblName, dbName); + } + } + + public Partition getPartition(String dbName, String tblName, List<String> partitionValues) { + try (CachedClient client = getClient()) { + return client.client.getPartition(dbName, tblName, partitionValues); + } catch (Exception e) { + throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName, + dbName, partitionValues); } } - public boolean listPartitionsByExpr(String dbName, String tblName, - byte[] partitionPredicatesInBytes, List<Partition> hivePartitions) { + public List<Partition> getPartitionsByFilter(String dbName, String tblName, String filter) { try (CachedClient client = getClient()) { - return client.client.listPartitionsByExpr(dbName, tblName, partitionPredicatesInBytes, - null, (short) -1, hivePartitions); + return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to get partition by filter for table %s in db %s", e, tblName, + dbName); } } @@ -96,7 +114,7 @@ public class PooledHiveMetaStoreClient { try (CachedClient client = getClient()) { return client.client.getTable(dbName, tblName); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to get table %s in db %s from hms client", e, tblName, dbName); } } @@ -104,7 +122,7 @@ public class PooledHiveMetaStoreClient { try (CachedClient client = getClient()) { return client.client.getSchema(dbName, tblName); } catch (Exception e) { - throw new RuntimeException(e); + throw new HMSClientException("failed to get schema for table %s in db %s", e, tblName, dbName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java new file mode 100644 index 0000000000..388c847559 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -0,0 +1,423 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.Metric; +import org.apache.doris.metric.MetricLabel; +import org.apache.doris.metric.MetricRepo; + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import lombok.Data; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// The cache of a hms catalog. 3 kind of caches: +// 1. partitionValuesCache: cache the partition values of a table, for partition prune. +// 2. partitionCache: cache the partition info(location, input format, etc.) of a table. +// 3. fileCache: cache the files of a location. +public class HiveMetaStoreCache { + private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class); + private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50; + + private HMSExternalCatalog catalog; + + // cache from <dbname-tblname> -> <values of partitions> + private LoadingCache<PartitionValueCacheKey, ImmutableList<ListPartitionItem>> partitionValuesCache; + // cache from <dbname-tblname-partition_values> -> <partition info> + private LoadingCache<PartitionCacheKey, HivePartition> partitionCache; + // cache from <location> -> <file list> + private LoadingCache<FileCacheKey, ImmutableList<InputSplit>> fileCache; + + public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) { + this.catalog = catalog; + init(executor); + initMetrics(); + } + + private void init(Executor executor) { + partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(CacheLoader.asyncReloading( + new CacheLoader<PartitionValueCacheKey, ImmutableList<ListPartitionItem>>() { + @Override + public ImmutableList<ListPartitionItem> load(PartitionValueCacheKey key) throws Exception { + return loadPartitionValues(key); + } + }, executor)); + + partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(CacheLoader.asyncReloading(new CacheLoader<PartitionCacheKey, HivePartition>() { + @Override + public HivePartition load(PartitionCacheKey key) throws Exception { + return loadPartitions(key); + } + }, executor)); + + fileCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_file_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(CacheLoader.asyncReloading(new CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() { + @Override + public ImmutableList<InputSplit> load(FileCacheKey key) throws Exception { + return loadFiles(key); + } + }, executor)); + } + + private void initMetrics() { + // partition value + GaugeMetric<Long> valueCacheGauge = new GaugeMetric<Long>("hive_meta_cache", + Metric.MetricUnit.NOUNIT, "hive partition value cache number") { + @Override + public Long getValue() { + return partitionValuesCache.size(); + } + }; + valueCacheGauge.addLabel(new MetricLabel("type", "partition_value")); + valueCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(valueCacheGauge); + // partition + GaugeMetric<Long> partitionCacheGauge = new GaugeMetric<Long>("hive_meta_cache", + Metric.MetricUnit.NOUNIT, "hive partition cache number") { + @Override + public Long getValue() { + return partitionCache.size(); + } + }; + partitionCacheGauge.addLabel(new MetricLabel("type", "partition")); + partitionCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(partitionCacheGauge); + // file + GaugeMetric<Long> fileCacheGauge = new GaugeMetric<Long>("hive_meta_cache", + Metric.MetricUnit.NOUNIT, "hive file cache number") { + @Override + public Long getValue() { + return fileCache.size(); + } + }; + fileCacheGauge.addLabel(new MetricLabel("type", "file")); + fileCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName())); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge); + } + + private ImmutableList<ListPartitionItem> loadPartitionValues(PartitionValueCacheKey key) { + // partition name format: nation=cn/city=beijing + List<String> partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName); + if (LOG.isDebugEnabled()) { + LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName()); + } + List<ListPartitionItem> partitionValues = Lists.newArrayListWithExpectedSize(partitionNames.size()); + for (String partitionName : partitionNames) { + partitionValues.add(toListPartitionItem(partitionName, key.types)); + } + return ImmutableList.copyOf(partitionValues); + } + + private ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) { + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + String[] parts = partitionName.split("/"); + Preconditions.checkState(parts.length == types.size(), partitionName + " vs. " + types); + List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size()); + for (String part : parts) { + String[] kv = part.split("="); + Preconditions.checkState(kv.length == 2, partitionName); + values.add(new PartitionValue(kv[1])); + } + try { + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types); + return new ListPartitionItem(Lists.newArrayList(key)); + } catch (AnalysisException e) { + throw new CacheException("failed to convert hive partition %s to list partition in catalog %s", + e, partitionName, catalog.getName()); + } + } + + private HivePartition loadPartitions(PartitionCacheKey key) { + Partition partition = catalog.getClient().getPartition(key.dbName, key.tblName, key.values); + StorageDescriptor sd = partition.getSd(); + if (LOG.isDebugEnabled()) { + LOG.debug("load partition format: {}, location: {} for {} in catalog {}", + sd.getInputFormat(), sd.getLocation(), key, catalog.getName()); + } + // TODO: more info? + return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values); + } + + private ImmutableList<InputSplit> loadFiles(FileCacheKey key) { + String finalLocation = convertToS3IfNecessary(key.location); + Configuration conf = getConfiguration(); + JobConf jobConf = new JobConf(conf); + // For Tez engine, it may generate subdirectories for "union" query. + // So there may be files and directories in the table directory at the same time. eg: + // /user/hive/warehouse/region_tmp_union_all2/000000_0 + // /user/hive/warehouse/region_tmp_union_all2/1 + // /user/hive/warehouse/region_tmp_union_all2/2 + // So we need to set this config to support visit dir recursively. + // Otherwise, getSplits() may throw exception: "Not a file xxx" + // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 + jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); + FileInputFormat.setInputPaths(jobConf, finalLocation); + try { + InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(conf, key.inputFormat, false); + InputSplit[] splits = inputFormat.getSplits(jobConf, 0); + if (LOG.isDebugEnabled()) { + LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName()); + } + return ImmutableList.copyOf(splits); + } catch (Exception e) { + throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName()); + } + } + + // convert oss:// to s3:// + private String convertToS3IfNecessary(String location) { + LOG.debug("try convert location to s3 prefix: " + location); + if (location.startsWith(FeConstants.FS_PREFIX_COS) + || location.startsWith(FeConstants.FS_PREFIX_BOS) + || location.startsWith(FeConstants.FS_PREFIX_BOS) + || location.startsWith(FeConstants.FS_PREFIX_OSS) + || location.startsWith(FeConstants.FS_PREFIX_S3A) + || location.startsWith(FeConstants.FS_PREFIX_S3N)) { + int pos = location.indexOf("://"); + if (pos == -1) { + throw new RuntimeException("No '://' found in location: " + location); + } + return "s3" + location.substring(pos); + } + return location; + } + + private Configuration getConfiguration() { + Configuration configuration = new HdfsConfiguration(); + for (Map.Entry<String, String> entry : catalog.getCatalogProperty().getProperties().entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + Map<String, String> s3Properties = catalog.getCatalogProperty().getS3Properties(); + for (Map.Entry<String, String> entry : s3Properties.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return configuration; + } + + public ImmutableList<ListPartitionItem> getPartitionValues(String dbName, String tblName, List<Type> types) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types); + try { + return partitionValuesCache.get(key); + } catch (ExecutionException e) { + throw new CacheException("failed to get partition values for %s in catalog %s", e, key, catalog.getName()); + } + } + + public List<InputSplit> getFilesByPartitions(List<HivePartition> partitions) { + long start = System.currentTimeMillis(); + List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size()); + partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat()))); + + Stream<FileCacheKey> stream; + if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) { + stream = keys.stream(); + } else { + stream = keys.parallelStream(); + } + List<ImmutableList<InputSplit>> fileLists = stream.map(k -> { + try { + return fileCache.get(k); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize( + fileLists.stream().mapToInt(l -> l.size()).sum()); + fileLists.stream().forEach(l -> retFiles.addAll(l)); + LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", + retFiles.size(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); + return retFiles; + } + + public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) { + long start = System.currentTimeMillis(); + List<PartitionCacheKey> keys = Lists.newArrayListWithExpectedSize(partitionValuesList.size()); + partitionValuesList.stream().forEach(p -> keys.add(new PartitionCacheKey(dbName, name, p))); + + Stream<PartitionCacheKey> stream; + if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) { + stream = keys.stream(); + } else { + stream = keys.parallelStream(); + } + List<HivePartition> partitions = stream.map(k -> { + try { + return partitionCache.get(k); + } catch (ExecutionException e) { + throw new CacheException("failed to get partition for %s in catalog %s", e, k, catalog.getName()); + } + }).collect(Collectors.toList()); + LOG.debug("get #{} partitions in catalog {} cost: {} ms", partitions.size(), catalog.getName(), + (System.currentTimeMillis() - start)); + return partitions; + } + + public void invalidateCache(String dbName, String tblName) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null); + partitionValuesCache.invalidate(key); + // TODO: find a way to invalidate partitionCache and fileCache + } + + /** + * The Key of hive partition value cache + */ + @Data + public static class PartitionValueCacheKey { + private String dbName; + private String tblName; + // not in key + private List<Type> types; + + public PartitionValueCacheKey(String dbName, String tblName, List<Type> types) { + this.dbName = dbName; + this.tblName = tblName; + this.types = types; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof PartitionValueCacheKey)) { + return false; + } + return dbName.equals(((PartitionValueCacheKey) obj).dbName) + && tblName.equals(((PartitionValueCacheKey) obj).tblName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tblName); + } + + @Override + public String toString() { + return "PartitionValueCacheKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; + } + } + + @Data + public static class PartitionCacheKey { + private String dbName; + private String tblName; + private List<String> values; + + public PartitionCacheKey(String dbName, String tblName, List<String> values) { + this.dbName = dbName; + this.tblName = tblName; + this.values = values; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof PartitionCacheKey)) { + return false; + } + return dbName.equals(((PartitionCacheKey) obj).dbName) + && tblName.equals(((PartitionCacheKey) obj).tblName) + && Objects.equals(values, ((PartitionCacheKey) obj).values); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tblName, values); + } + + @Override + public String toString() { + return "PartitionCacheKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + ", values=" + + values + '}'; + } + } + + @Data + public static class FileCacheKey { + private String location; + // not in key + private String inputFormat; + + public FileCacheKey(String location, String inputFormat) { + this.location = location; + this.inputFormat = inputFormat; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof FileCacheKey)) { + return false; + } + return location.equals(((FileCacheKey) obj).location); + } + + @Override + public int hashCode() { + return Objects.hash(location); + } + + @Override + public String toString() { + return "FileCacheKey{" + "location='" + location + '\'' + ", inputFormat='" + inputFormat + '\'' + '}'; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java new file mode 100644 index 0000000000..e5b5178e75 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import lombok.Data; + +import java.util.List; + +@Data +public class HivePartition { + private String inputFormat; + private String path; + private List<String> partitionValues; + + public HivePartition(String inputFormat, String path, List<String> partitionValues) { + // eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat + this.inputFormat = inputFormat; + // eg: hdfs://hk-dev01:8121/user/doris/parquet/partition_table/nation=cn/city=beijing + this.path = path; + // eg: cn, beijing + this.partitionValues = partitionValues; + } + + @Override + public String toString() { + return "HivePartition{" + + "inputFormat='" + inputFormat + '\'' + + ", path='" + path + '\'' + + ", partitionValues=" + partitionValues + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 39bd5e7dfd..c93ea8d14a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -45,7 +45,6 @@ import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.doris.datasource.InitTableLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.journal.Journal; import org.apache.doris.journal.JournalCursor; @@ -951,8 +950,7 @@ public class EditLog { break; } case OperationType.OP_INIT_EXTERNAL_TABLE: { - final InitTableLog log = (InitTableLog) journal.getData(); - env.getCatalogMgr().replayInitExternalTable(log); + // Do nothing. break; } default: { @@ -1626,10 +1624,6 @@ public class EditLog { logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log); } - public void logInitExternalTable(InitTableLog log) { - logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log); - } - public Journal getJournal() { return this.journal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 82304acb23..6204fc1836 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -243,6 +243,7 @@ public class OperationType { public static final short OP_REFRESH_EXTERNAL_DB = 326; public static final short OP_INIT_EXTERNAL_DB = 327; public static final short OP_REFRESH_EXTERNAL_TABLE = 328; + @Deprecated public static final short OP_INIT_EXTERNAL_TABLE = 329; // scheduler job and task 330-350 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index c522823622..8523830c66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -141,7 +141,6 @@ public abstract class ScanNode extends PlanNode { columnNameToRange.put(column.getName(), columnRange); } } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 45a7b21747..0cf6661846 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -164,6 +164,8 @@ public class ExternalFileScanNode extends ExternalScanNode { switch (type) { case QUERY: + // prepare for partition prune + computeColumnFilter(); if (this.desc.getTable() instanceof HMSExternalTable) { HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable(); initHMSExternalTable(hmsTable); @@ -199,13 +201,13 @@ public class ExternalFileScanNode extends ExternalScanNode { FileScanProviderIf scanProvider; switch (hmsTable.getDlaType()) { case HUDI: - scanProvider = new HudiScanProvider(hmsTable, desc); + scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); break; case ICEBERG: - scanProvider = new IcebergScanProvider(hmsTable, desc); + scanProvider = new IcebergScanProvider(hmsTable, desc, columnNameToRange); break; case HIVE: - scanProvider = new HiveScanProvider(hmsTable, desc); + scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); break; default: throw new UserException("Unknown table type: " + hmsTable.getDlaType()); @@ -511,9 +513,7 @@ public class ExternalFileScanNode extends ExternalScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(prefix); - // output.append(fileTable.getExplainString(prefix)); - + StringBuilder output = new StringBuilder(); if (!conjuncts.isEmpty()) { output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index a5e8a7aabc..17e6d3417f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -22,16 +22,23 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.HiveBucketUtil; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; @@ -40,23 +47,20 @@ import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -75,9 +79,13 @@ public class HiveScanProvider extends HMSTableScanProvider { protected final TupleDescriptor desc; - public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { + protected Map<String, ColumnRange> columnNameToRange; + + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { this.hmsTable = hmsTable; this.desc = desc; + this.columnNameToRange = columnNameToRange; } @Override @@ -127,80 +135,69 @@ public class HiveScanProvider extends HMSTableScanProvider { } @Override - public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException { - // eg: - // oss://buckts/data_dir - // hdfs://hosts/data_dir - String location = getRemoteHiveTable().getSd().getLocation(); - List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName) - .collect(Collectors.toList()); - List<Partition> hivePartitions = new ArrayList<>(); - - if (partitionKeys.size() > 0) { - ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr(exprs, - partitionKeys, hmsTable.getName()); - hivePartitions.addAll(hmsTable.getHivePartitions(hivePartitionPredicate)); - } + public List<InputSplit> getSplits(List<Expr> exprs) throws UserException { + long start = System.currentTimeMillis(); + try { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + // 1. get ListPartitionItems from cache + ImmutableList<ListPartitionItem> partitionItems; + List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); + if (!partitionColumnTypes.isEmpty()) { + partitionItems = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), + partitionColumnTypes); + } else { + partitionItems = ImmutableList.of(); + } - String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); + List<InputSplit> allFiles = Lists.newArrayList(); + if (!partitionItems.isEmpty()) { + // 2. prune partitions by expr + Map<Long, PartitionItem> keyItemMap = Maps.newHashMap(); + long pid = 0; + for (ListPartitionItem partitionItem : partitionItems) { + keyItemMap.put(pid++, partitionItem); + } + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(keyItemMap, + hmsTable.getPartitionColumns(), columnNameToRange); + Collection<Long> filteredPartitionIds = pruner.prune(); - Configuration configuration = setConfiguration(); - InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); - List<InputSplit> splits; - if (!hivePartitions.isEmpty()) { - try { - splits = hivePartitions.stream().flatMap(x -> { - try { - return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - } catch (RuntimeException e) { - throw new IOException(e); + // 3. get partitions from cache + List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); + for (Long id : filteredPartitionIds) { + ListPartitionItem listPartitionItem = (ListPartitionItem) keyItemMap.get(id); + partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); + } + List<HivePartition> partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), + partitionValuesList); + // 4. get all files of partitions + getFileSplitByPartitions(cache, partitions, allFiles); + } else { + // unpartitioned table, create a dummy partition to save location and inputformat, + // so that we can unify the interface. + HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), + hmsTable.getRemoteTable().getSd().getLocation(), null); + getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); } - } else { - splits = getSplitsByPath(inputFormat, configuration, location); + LOG.debug("get #{} files for table: {}.{}, cost: {} ms", + allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); + return allFiles; + } catch (Throwable t) { + LOG.warn("get file split failed for table: {}", hmsTable.getName(), t); + throw new UserException("get file split failed for table: " + hmsTable.getName(), t); } - return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, - getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), - getRemoteHiveTable().getParameters()); - } - - private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat, Configuration configuration, - String location) throws IOException { - String finalLocation = convertToS3IfNecessary(location); - JobConf jobConf = new JobConf(configuration); - // For Tez engine, it may generate subdirectoies for "union" query. - // So there may be files and directories in the table directory at the same time. eg: - // /user/hive/warehouse/region_tmp_union_all2/000000_0 - // /user/hive/warehouse/region_tmp_union_all2/1 - // /user/hive/warehouse/region_tmp_union_all2/2 - // So we need to set this config to support visit dir recursively. - // Otherwise, getSplits() may throw exception: "Not a file xxx" - // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); - FileInputFormat.setInputPaths(jobConf, finalLocation); - InputSplit[] splits = inputFormat.getSplits(jobConf, 0); - return Lists.newArrayList(splits); } - // convert oss:// to s3:// - private String convertToS3IfNecessary(String location) throws IOException { - LOG.debug("try convert location to s3 prefix: " + location); - if (location.startsWith(FeConstants.FS_PREFIX_COS) - || location.startsWith(FeConstants.FS_PREFIX_BOS) - || location.startsWith(FeConstants.FS_PREFIX_BOS) - || location.startsWith(FeConstants.FS_PREFIX_OSS) - || location.startsWith(FeConstants.FS_PREFIX_S3A) - || location.startsWith(FeConstants.FS_PREFIX_S3N)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new IOException("No '://' found in location: " + location); - } - return "s3" + location.substring(pos); + private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions, + List<InputSplit> allFiles) { + List<InputSplit> files = cache.getFilesByPartitions(partitions); + if (LOG.isDebugEnabled()) { + LOG.debug("get #{} files from #{} partitions: {}: {}", files.size(), partitions.size(), + Joiner.on(",") + .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) + .collect(Collectors.toList()))); } - return location; + allFiles.addAll(files); } protected Configuration setConfiguration() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java index d4c95b2549..59274ec521 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java @@ -21,10 +21,12 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileFormatType; import java.util.Collections; import java.util.List; +import java.util.Map; /** * A file scan provider for hudi. @@ -32,8 +34,9 @@ import java.util.List; */ public class HudiScanProvider extends HiveScanProvider { - public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { - super(hmsTable, desc); + public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { + super(hmsTable, desc, columnNameToRange); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index f82e277f41..37a5266559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileFormatType; import org.apache.hadoop.conf.Configuration; @@ -36,7 +37,6 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,8 +48,9 @@ import java.util.Map; */ public class IcebergScanProvider extends HiveScanProvider { - public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) { - super(hmsTable, desc); + public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { + super(hmsTable, desc, columnNameToRange); } @Override @@ -69,7 +70,7 @@ public class IcebergScanProvider extends HiveScanProvider { } @Override - public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException { + public List<InputSplit> getSplits(List<Expr> exprs) throws UserException { List<Expression> expressions = new ArrayList<>(); for (Expr conjunct : exprs) { Expression expression = IcebergUtils.convertToIcebergExpr(conjunct); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index eae1829603..561b2de316 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -39,8 +39,8 @@ import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Joiner; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.List; @@ -56,6 +56,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf { @Override public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, List<TScanRangeLocations> scanRangeLocations) throws UserException { + long start = System.currentTimeMillis(); try { List<InputSplit> inputSplits = getSplits(context.conjuncts); this.inputSplitNum = inputSplits.size(); @@ -100,10 +101,9 @@ public abstract class QueryScanProvider implements FileScanProviderIf { TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - LOG.info( - "Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " - + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" - + " loaction: " + Joiner.on("|").join(split.getLocations())); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(split.getLocations())); fileSplitStrategy.update(fileSplit); // Add a new location when it's can be split @@ -117,6 +117,8 @@ public abstract class QueryScanProvider implements FileScanProviderIf { if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) { scanRangeLocations.add(curLocations); } + LOG.debug("create #{} ScanRangeLocations cost: {} ms", + scanRangeLocations.size(), (System.currentTimeMillis() - start)); } catch (IOException e) { throw new UserException(e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 58a8e10478..c22cb0c2d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -583,7 +583,7 @@ public class StmtExecutor implements ProfileWriter { throw e; } catch (UserException e) { // analysis exception only print message, not print the stack - LOG.warn("execute Exception. {}, {}", context.getQueryIdentifier(), e.getMessage(), e); + LOG.warn("execute Exception. {}, {}", context.getQueryIdentifier(), e.getMessage()); context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org