This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7182f14645 [improvement][fix](multi-catalog) speed up list partition prune (#14268) 7182f14645 is described below commit 7182f1464518b868c92bb37c19455242b17b171e Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Nov 17 08:30:03 2022 +0800 [improvement][fix](multi-catalog) speed up list partition prune (#14268) In previous implementation, when doing list partition prune, we need to generation `rangeToId` every time we doing prune. But `rangeToId` is actually a static data that should be create-once-use-every-where. So for hive partition, I created the `rangeToId` and all other necessary data structures for partition prunning in partition cache, so that we can use it directly. In my test, the cost of partition prune for 10000 partitions reduce from 8s -> 0.2s. Aslo add "partition" info in explain string for hive table. ``` | 0:VEXTERNAL_FILE_SCAN_NODE | | predicates: `nation` = '0024c95b' | | inputSplitNum=1, totalFileSize=4750, scanRanges=1 | | partition=1/10000 | | numNodes=1 | | limit: 10 | ``` Bug fix: 1. Fix bug that es scan node can not filter data 2. Fix bug that query es with predicate like `where substring(test2,2) = "ext2";` will fail at planner phase. `Unexpected exception: org.apache.doris.analysis.FunctionCallExpr cannot be cast to org.apache.doris.analysis.SlotRef` TODO: 1. Some problem when quering es version 8: ` Unexpected exception: Index: 0, Size: 0`, will be fixed later. --- be/src/vec/exec/scan/new_es_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_es_scanner.cpp | 6 +- be/src/vec/exec/scan/new_es_scanner.h | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.h | 4 +- docker/thirdparties/start-thirdparties-docker.sh | 2 +- .../doris/catalog/external/EsExternalTable.java | 2 +- .../doris/catalog/external/ExternalDatabase.java | 2 +- .../doris/catalog/external/ExternalTable.java | 3 +- .../doris/catalog/external/HMSExternalTable.java | 48 ++++++------- .../apache/doris/datasource/ExternalCatalog.java | 2 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 55 ++++++++++++--- .../doris/external/elasticsearch/EsUtil.java | 6 +- .../doris/planner/ListPartitionPrunerV2.java | 81 ++++++++++++++++------ .../doris/planner/PartitionPrunerV2Base.java | 49 ++++++++----- .../doris/planner/RangePartitionPrunerV2.java | 12 +++- .../planner/external/ExternalFileScanNode.java | 8 +++ .../doris/planner/external/HiveScanProvider.java | 44 +++++++----- .../org/apache/doris/qe/MasterCatalogExecutor.java | 5 +- .../apache/doris/service/FrontendServiceImpl.java | 32 +-------- regression-test/data/es_p0/test_es_query.out | 25 ++++++- regression-test/suites/es_p0/test_es_query.groovy | 19 +++-- 22 files changed, 262 insertions(+), 149 deletions(-) diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index dcad57a418..6a57e917b4 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -205,7 +205,7 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) { properties, _docvalue_context, doc_value_mode); _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare(_state)); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast<VScanner*>(scanner)); } return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index cfde06ab3c..358a226d1e 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -41,8 +41,12 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l _docvalue_context(docvalue_context), _doc_value_mode(doc_value_mode) {} -Status NewEsScanner::prepare(RuntimeState* state) { +Status NewEsScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } if (_is_init) { return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index be4d50448b..4e82d72af9 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -36,7 +36,7 @@ public: Status close(RuntimeState* state) override; public: - Status prepare(RuntimeState* state); + Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 066074f16a..b91300bf47 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -19,7 +19,7 @@ namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, - TupleId tuple_id, std::string query_string) + const TupleId& tuple_id, const std::string& query_string) : VScanner(state, static_cast<VScanNode*>(parent), limit), _is_init(false), _jdbc_eos(false), diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 6a45462db1..24c2649dfe 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -25,8 +25,8 @@ namespace doris { namespace vectorized { class NewJdbcScanner : public VScanner { public: - NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, TupleId tuple_id, - std::string query_string); + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, + const TupleId& tuple_id, const std::string& query_string); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/docker/thirdparties/start-thirdparties-docker.sh b/docker/thirdparties/start-thirdparties-docker.sh index 8f1a577d9b..986d04da7a 100755 --- a/docker/thirdparties/start-thirdparties-docker.sh +++ b/docker/thirdparties/start-thirdparties-docker.sh @@ -31,7 +31,7 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" CONTAINER_UID="doris--" # elasticsearch -sed -i "" "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml +sed -i "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml sudo docker compose -f "${ROOT}"/docker-compose/elasticsearch/es.yaml --env-file "${ROOT}"/docker-compose/elasticsearch/es.env down sudo mkdir -p "${ROOT}"/docker-compose/elasticsearch/data/es6/ sudo rm -rf "${ROOT}"/docker-compose/elasticsearch/data/es6/* diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index eb8c8972b1..e2efd6aae1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -49,7 +49,7 @@ public class EsExternalTable extends ExternalTable { super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE); } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { esTable = toEsTable(); objectCreated = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 87d1cccf5d..ffbbda4a46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -107,7 +107,7 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(extCatalog.getId(), id, -1); + remoteExecutor.forward(extCatalog.getId(), id); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init external db %s operation to master", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 36189c9a00..e97c5b1a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -100,7 +100,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return false; } - public void makeSureInitialized() { + protected void makeSureInitialized() { throw new NotImplementedException(); } @@ -213,7 +213,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public List<Column> getFullSchema() { - makeSureInitialized(); ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchema(dbName, name); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 6e12f7e30a..168cf31a65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -78,7 +78,7 @@ public class HMSExternalTable extends ExternalTable { return dlaType != DLAType.UNKNOWN; } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { try { getRemoteTable(); @@ -98,10 +98,29 @@ public class HMSExternalTable extends ExternalTable { dlaType = DLAType.UNKNOWN; } } + + initPartitionColumns(); objectCreated = true; } } + private void initPartitionColumns() { + Set<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toSet()); + partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + for (String partitionKey : partitionKeys) { + // Do not use "getColumn()", which will cause dead loop + List<Column> schema = getFullSchema(); + for (Column column : schema) { + if (partitionKey.equals(column.getName())) { + partitionColumns.add(column); + break; + } + } + } + LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); + } + /** * Now we only support cow table in iceberg. */ @@ -161,13 +180,11 @@ public class HMSExternalTable extends ExternalTable { public List<Type> getPartitionColumnTypes() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); } public List<Column> getPartitionColumns() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns; } @@ -268,30 +285,5 @@ public class HMSExternalTable extends ExternalTable { public Map<String, String> getS3Properties() { return catalog.getCatalogProperty().getS3Properties(); } - - private void initPartitionColumns() { - if (partitionColumns != null) { - return; - } - synchronized (this) { - if (partitionColumns != null) { - return; - } - Set<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) - .collect(Collectors.toSet()); - partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); - for (String partitionKey : partitionKeys) { - // Do not use "getColumn()", which will cause dead loop - List<Column> schema = getFullSchema(); - for (Column column : schema) { - if (partitionKey.equals(column.getName())) { - partitionColumns.add(column); - break; - } - } - } - LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 0663fed2e1..385c85fadf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -102,7 +102,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(id, -1, -1); + remoteExecutor.forward(id, -1); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init catalog %s operation to master.", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 388c847559..ddcc12d5fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.PartitionValue; import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -31,6 +32,9 @@ import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -38,6 +42,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -70,7 +77,7 @@ public class HiveMetaStoreCache { private HMSExternalCatalog catalog; // cache from <dbname-tblname> -> <values of partitions> - private LoadingCache<PartitionValueCacheKey, ImmutableList<ListPartitionItem>> partitionValuesCache; + private LoadingCache<PartitionValueCacheKey, HivePartitionValues> partitionValuesCache; // cache from <dbname-tblname-partition_values> -> <partition info> private LoadingCache<PartitionCacheKey, HivePartition> partitionCache; // cache from <location> -> <file list> @@ -86,9 +93,9 @@ public class HiveMetaStoreCache { partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(CacheLoader.asyncReloading( - new CacheLoader<PartitionValueCacheKey, ImmutableList<ListPartitionItem>>() { + new CacheLoader<PartitionValueCacheKey, HivePartitionValues>() { @Override - public ImmutableList<ListPartitionItem> load(PartitionValueCacheKey key) throws Exception { + public HivePartitionValues load(PartitionValueCacheKey key) throws Exception { return loadPartitionValues(key); } }, executor)); @@ -148,17 +155,31 @@ public class HiveMetaStoreCache { MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge); } - private ImmutableList<ListPartitionItem> loadPartitionValues(PartitionValueCacheKey key) { + private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) { // partition name format: nation=cn/city=beijing List<String> partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName); if (LOG.isDebugEnabled()) { LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName()); } - List<ListPartitionItem> partitionValues = Lists.newArrayListWithExpectedSize(partitionNames.size()); + Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size()); + long idx = 0; for (String partitionName : partitionNames) { - partitionValues.add(toListPartitionItem(partitionName, key.types)); + idToPartitionItem.put(idx++, toListPartitionItem(partitionName, key.types)); } - return ImmutableList.copyOf(partitionValues); + + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null; + Map<Range<PartitionKey>, UniqueId> rangeToId = null; + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null; + if (key.types.size() > 1) { + // uidToPartitionRange and rangeToId are only used for multi-column partition + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem); + rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + } else { + Preconditions.checkState(key.types.size() == 1, key.types); + // singleColumnRangeMap is only used for single-column partition + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem); + } + return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap); } private ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) { @@ -248,7 +269,7 @@ public class HiveMetaStoreCache { return configuration; } - public ImmutableList<ListPartitionItem> getPartitionValues(String dbName, String tblName, List<Type> types) { + public HivePartitionValues getPartitionValues(String dbName, String tblName, List<Type> types) { PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types); try { return partitionValuesCache.get(key); @@ -420,4 +441,22 @@ public class HiveMetaStoreCache { return "FileCacheKey{" + "location='" + location + '\'' + ", inputFormat='" + inputFormat + '\'' + '}'; } } + + @Data + public static class HivePartitionValues { + private Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMap(); + private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange; + private Map<Range<PartitionKey>, UniqueId> rangeToId; + private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap; + + public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem, + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange, + Map<Range<PartitionKey>, UniqueId> rangeToId, + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + this.singleColumnRangeMap = singleColumnRangeMap; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index b8dd5f8d84..8a8055f72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -274,8 +274,11 @@ public class EsUtil { notPushDownList.add(expr); return null; } - } else { + } else if (leftExpr instanceof SlotRef) { column = ((SlotRef) leftExpr).getColumnName(); + } else { + notPushDownList.add(expr); + return null; } // Replace col with col.keyword if mapping exist. column = fieldsContext.getOrDefault(column, column); @@ -448,3 +451,4 @@ public class EsUtil { } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java index 6d45efc8dd..bbefea50e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java @@ -24,11 +24,14 @@ import org.apache.doris.common.AnalysisException; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.Collections; @@ -40,33 +43,62 @@ import java.util.stream.Collectors; /** * ListPartitionPrunerV2 + * * @since 1.0 */ @SuppressWarnings("UnstableApiUsage") public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { - private final Map<UniqueId, Range<PartitionKey>> uidToPartitionRange; + private static final Logger LOG = LogManager.getLogger(ListPartitionPrunerV2.class); + // `uidToPartitionRange` is only used for multiple columns partition. + private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange; + private Map<Range<PartitionKey>, UniqueId> rangeToId; public ListPartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem, - List<Column> partitionColumns, - Map<String, ColumnRange> columnNameToRange) { + List<Column> partitionColumns, + Map<String, ColumnRange> columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); this.uidToPartitionRange = Maps.newHashMap(); if (partitionColumns.size() > 1) { - // `uidToPartitionRange` is only used for multiple columns partition. - idToPartitionItem.forEach((id, item) -> { - List<PartitionKey> keys = item.getItems(); - List<Range<PartitionKey>> ranges = keys.stream() - .map(key -> Range.closed(key, key)) - .collect(Collectors.toList()); - for (int i = 0; i < ranges.size(); i++) { - uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); - } - }); + this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem); + this.rangeToId = genRangeToId(uidToPartitionRange); } } + // Pass uidToPartitionRange and rangeToId from outside + public ListPartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem, + List<Column> partitionColumns, + Map<String, ColumnRange> columnNameToRange, + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange, + Map<Range<PartitionKey>, UniqueId> rangeToId, + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) { + super(idToPartitionItem, partitionColumns, columnNameToRange, singleColumnRangeMap); + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + } + + public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange( + Map<Long, PartitionItem> idToPartitionItem) { + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = Maps.newHashMap(); + idToPartitionItem.forEach((id, item) -> { + List<PartitionKey> keys = item.getItems(); + List<Range<PartitionKey>> ranges = keys.stream() + .map(key -> Range.closed(key, key)) + .collect(Collectors.toList()); + for (int i = 0; i < ranges.size(); i++) { + uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); + } + }); + return uidToPartitionRange; + } + @Override - RangeMap<ColumnBound, UniqueId> getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) { RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { List<PartitionKey> keys = item.getItems(); @@ -75,7 +107,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { .collect(Collectors.toList()); for (int i = 0; i < ranges.size(); i++) { candidate.put(mapPartitionKeyRange(ranges.get(i), 0), - new ListPartitionUniqueId(id, i)); + new ListPartitionUniqueId(id, i)); } }); return candidate; @@ -86,7 +118,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { */ @Override FinalFilters getFinalFilters(ColumnRange columnRange, - Column column) throws AnalysisException { + Column column) throws AnalysisException { if (!columnRange.hasFilter()) { return FinalFilters.noFilters(); } @@ -107,19 +139,26 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { @Override Collection<Long> pruneMultipleColumnPartition( Map<Column, FinalFilters> columnToFilters) throws AnalysisException { + Preconditions.checkNotNull(uidToPartitionRange); + Preconditions.checkNotNull(rangeToId); + return doPruneMultiple(columnToFilters, rangeToId, 0); + } + + public static Map<Range<PartitionKey>, UniqueId> genRangeToId( + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange) { Map<Range<PartitionKey>, UniqueId> rangeToId = Maps.newHashMap(); uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid)); - return doPruneMultiple(columnToFilters, rangeToId, 0); + return rangeToId; } private Collection<Long> doPruneMultiple(Map<Column, FinalFilters> columnToFilters, - Map<Range<PartitionKey>, UniqueId> partitionRangeToUid, - int columnIdx) { + Map<Range<PartitionKey>, UniqueId> partitionRangeToUid, + int columnIdx) { // No more partition column. if (columnIdx == partitionColumns.size()) { return partitionRangeToUid.values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); } FinalFilters finalFilters = columnToFilters.get(partitionColumns.get(columnIdx)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java index d2b169d3de..e1772509ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; +import com.google.common.base.Preconditions; import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,15 +41,28 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { protected final Map<Long, PartitionItem> idToPartitionItem; protected final List<Column> partitionColumns; protected final Map<String, ColumnRange> columnNameToRange; + // used for single column partition + protected RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null; public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem, - List<Column> partitionColumns, - Map<String, ColumnRange> columnNameToRange) { + List<Column> partitionColumns, + Map<String, ColumnRange> columnNameToRange) { this.idToPartitionItem = idToPartitionItem; this.partitionColumns = partitionColumns; this.columnNameToRange = columnNameToRange; } + // pass singleColumnRangeMap from outside + public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem, + List<Column> partitionColumns, + Map<String, ColumnRange> columnNameToRange, + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.partitionColumns = partitionColumns; + this.columnNameToRange = columnNameToRange; + this.singleColumnRangeMap = singleColumnRangeMap; + } + @Override public Collection<Long> prune() throws AnalysisException { Map<Column, FinalFilters> columnToFilters = Maps.newHashMap(); @@ -70,7 +84,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - abstract RangeMap<ColumnBound, UniqueId> getCandidateRangeMap(); + abstract void genSingleColumnRangeMap(); /** * Handle conjunctive and disjunctive `is null` predicates. @@ -107,29 +121,30 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { case CONSTANT_FALSE_FILTERS: return Collections.emptyList(); case HAVE_FILTERS: - RangeMap<ColumnBound, UniqueId> candidate = getCandidateRangeMap(); + genSingleColumnRangeMap(); + Preconditions.checkNotNull(singleColumnRangeMap); return finalFilters.filters.stream() - .map(filter -> { - RangeMap<ColumnBound, UniqueId> filtered = candidate.subRangeMap(filter); - return filtered.asMapOfRanges().values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); - }) - .flatMap(Set::stream) - .collect(Collectors.toSet()); + .map(filter -> { + RangeMap<ColumnBound, UniqueId> filtered = singleColumnRangeMap.subRangeMap(filter); + return filtered.asMapOfRanges().values().stream() + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); + }) + .flatMap(Set::stream) + .collect(Collectors.toSet()); case NO_FILTERS: default: return idToPartitionItem.keySet(); } } - protected Range<ColumnBound> mapPartitionKeyRange(Range<PartitionKey> fromRange, - int columnIdx) { + protected static Range<ColumnBound> mapPartitionKeyRange(Range<PartitionKey> fromRange, + int columnIdx) { return mapRange(fromRange, - partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); + partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); } - protected <TO extends Comparable, FROM extends Comparable> Range<TO> mapRange( + private static <TO extends Comparable, FROM extends Comparable> Range<TO> mapRange( Range<FROM> range, Function<FROM, TO> mapper) { TO lower = range.hasLowerBound() ? mapper.apply(range.lowerEndpoint()) : null; TO upper = range.hasUpperBound() ? mapper.apply(range.upperEndpoint()) : null; @@ -157,7 +172,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - protected interface UniqueId { + public interface UniqueId { long getPartitionId(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java index 256c5c11f4..4aa3ee41a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java @@ -43,13 +43,19 @@ import java.util.Set; public class RangePartitionPrunerV2 extends PartitionPrunerV2Base { public RangePartitionPrunerV2(Map<Long, PartitionItem> idToPartitionItem, - List<Column> partitionColumns, - Map<String, ColumnRange> columnNameToRange) { + List<Column> partitionColumns, + Map<String, ColumnRange> columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); } @Override - RangeMap<ColumnBound, UniqueId> getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) { RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { Range<PartitionKey> range = item.getItems(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 0cf6661846..0dd425d917 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -121,6 +121,8 @@ public class ExternalFileScanNode extends ExternalScanNode { // For explain private long inputSplitsNum = 0; private long totalFileSize = 0; + private long totalPartitionNum = 0; + private long readPartitionNum = 0; /** * External file scan node for: @@ -302,6 +304,10 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + if (scanProvider instanceof HiveScanProvider) { + this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); + this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); + } } } @@ -524,6 +530,8 @@ public class ExternalFileScanNode extends ExternalScanNode { output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) + .append("\n"); output.append(prefix); if (cardinality > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 17e6d3417f..e257f96359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -35,6 +35,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; @@ -48,7 +49,6 @@ import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -76,11 +76,12 @@ public class HiveScanProvider extends HMSTableScanProvider { private static final String DEFAULT_LINE_DELIMITER = "\n"; protected HMSExternalTable hmsTable; - protected final TupleDescriptor desc; - protected Map<String, ColumnRange> columnNameToRange; + protected int totalPartitionNum = 0; + protected int readPartitionNum = 0; + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, Map<String, ColumnRange> columnNameToRange) { this.hmsTable = hmsTable; @@ -141,31 +142,32 @@ public class HiveScanProvider extends HMSTableScanProvider { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); // 1. get ListPartitionItems from cache - ImmutableList<ListPartitionItem> partitionItems; + HivePartitionValues hivePartitionValues = null; List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { - partitionItems = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), + hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes); - } else { - partitionItems = ImmutableList.of(); } List<InputSplit> allFiles = Lists.newArrayList(); - if (!partitionItems.isEmpty()) { + if (hivePartitionValues != null) { // 2. prune partitions by expr - Map<Long, PartitionItem> keyItemMap = Maps.newHashMap(); - long pid = 0; - for (ListPartitionItem partitionItem : partitionItems) { - keyItemMap.put(pid++, partitionItem); - } - ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(keyItemMap, - hmsTable.getPartitionColumns(), columnNameToRange); + Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + hmsTable.getPartitionColumns(), columnNameToRange, + hivePartitionValues.getUidToPartitionRange(), + hivePartitionValues.getRangeToId(), + hivePartitionValues.getSingleColumnRangeMap()); Collection<Long> filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms", + hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); // 3. get partitions from cache List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); for (Long id : filteredPartitionIds) { - ListPartitionItem listPartitionItem = (ListPartitionItem) keyItemMap.get(id); + ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } List<HivePartition> partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), @@ -178,6 +180,8 @@ public class HiveScanProvider extends HMSTableScanProvider { HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), hmsTable.getRemoteTable().getSd().getLocation(), null); getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); + this.totalPartitionNum = 1; + this.readPartitionNum = 1; } LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); @@ -212,6 +216,14 @@ public class HiveScanProvider extends HMSTableScanProvider { return conf; } + public int getTotalPartitionNum() { + return totalPartitionNum; + } + + public int getReadPartitionNum() { + return readPartitionNum; + } + @Override public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { return hmsTable.getRemoteTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java index c3a08d6d50..a26c687e10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java @@ -43,7 +43,7 @@ public class MasterCatalogExecutor { waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; } - public void forward(long catalogId, long dbId, long tableId) throws Exception { + public void forward(long catalogId, long dbId) throws Exception { if (!ctx.getEnv().isReady()) { throw new Exception("Current catalog is not ready, please wait for a while."); } @@ -62,9 +62,6 @@ public class MasterCatalogExecutor { if (dbId != -1) { request.setDbId(dbId); } - if (tableId != -1) { - request.setTableId(tableId); - } boolean isReturnToPool = false; try { TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 57a7cb58d8..a7a128b567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1194,9 +1194,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException { - if (request.isSetCatalogId() && request.isSetDbId() && request.isSetTableId()) { - return initTable(request.catalogId, request.dbId, request.tableId); - } else if (request.isSetCatalogId() && request.isSetDbId()) { + if (request.isSetCatalogId() && request.isSetDbId()) { return initDb(request.catalogId, request.dbId); } else if (request.isSetCatalogId()) { return initCatalog(request.catalogId); @@ -1235,32 +1233,4 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setStatus("OK"); return result; } - - private TInitExternalCtlMetaResult initTable(long catalogId, long dbId, long tableId) - throws TException { - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); - if (!(catalog instanceof ExternalCatalog)) { - throw new TException("Only support forward ExternalCatalog init operation."); - } - DatabaseIf db = catalog.getDbNullable(dbId); - if (db == null) { - throw new TException("database " + dbId + " is null"); - } - if (!(db instanceof ExternalDatabase)) { - throw new TException("Only support forward ExternalDatabase init operation."); - } - TableIf table = db.getTableNullable(tableId); - if (table == null) { - throw new TException("table " + tableId + " is null"); - } - if (!(table instanceof ExternalTable)) { - throw new TException("Only support forward ExternalTable init operation."); - } - - ((ExternalTable) table).makeSureInitialized(); - TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); - result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); - result.setStatus("OK"); - return result; - } } diff --git a/regression-test/data/es_p0/test_es_query.out b/regression-test/data/es_p0/test_es_query.out index 651370b242..29768407a7 100644 --- a/regression-test/data/es_p0/test_es_query.out +++ b/regression-test/data/es_p0/test_es_query.out @@ -1,12 +1,31 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql1 -- +-- !sql61 -- test1 test2 test2_20220808 --- !sql1 -- +-- !sql62 -- 2022-08-08 text#1 3.14 string1 --- !sql1 -- +-- !sql63 -- 2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql64 -- +2022-09-08 text2 4.0 string2 + +-- !sql71 -- +test1 +test2 +test2_20220808 + +-- !sql72 -- +2022-08-08 text#1 3.14 string1 + +-- !sql73 -- +2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql74 -- +2022-09-08 text2 4.0 string2 diff --git a/regression-test/suites/es_p0/test_es_query.groovy b/regression-test/suites/es_p0/test_es_query.groovy index 2e8ec3c392..04b939a51d 100644 --- a/regression-test/suites/es_p0/test_es_query.groovy +++ b/regression-test/suites/es_p0/test_es_query.groovy @@ -54,9 +54,18 @@ suite("test_es_query", "p0") { ); """ sql """switch es6""" - order_qt_sql1 """show tables""" - order_qt_sql1 """select * from test1 where test2='text#1'""" - sql """switch es8""" - order_qt_sql1 """select * from test1 where test2='text'""" + order_qt_sql61 """show tables""" + order_qt_sql62 """select * from test1 where test2='text#1'""" + order_qt_sql63 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql64 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + sql """switch es7""" + order_qt_sql71 """show tables""" + order_qt_sql72 """select * from test1 where test2='text#1'""" + order_qt_sql73 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql74 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + // es8 has some problem, need fix + // sql """switch es8""" + // order_qt_sql1 """select * from test1 where test2='text'""" + // order_qt_sql2 """select * from test2_20220808 where test4='2022-08-08'""" } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org