This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 265c26f [Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055) 265c26f is described below commit 265c26f67d80ec7074572c33bb3d71a773781bb2 Author: Yunfeng,Wu <wuyunfen...@baidu.com> AuthorDate: Fri Jul 10 18:37:36 2020 +0800 [Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055) --- be/src/exec/es/es_scan_reader.h | 1 + be/src/exec/es/es_scroll_query.cpp | 20 +++++--- .../java/org/apache/doris/catalog/Catalog.java | 1 + .../java/org/apache/doris/catalog/EsTable.java | 52 +++++++++++++++++++-- .../java/org/apache/doris/planner/EsScanNode.java | 54 +++++++++++++++++----- 5 files changed, 105 insertions(+), 23 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 3bd64cf..52b936c 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,6 +40,7 @@ public: static constexpr const char* KEY_QUERY = "query"; static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; + static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode); ~ESScanReader(); diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp index 0c4f581..90d68f0 100644 --- a/be/src/exec/es/es_scroll_query.cpp +++ b/be/src/exec/es/es_scroll_query.cpp @@ -76,14 +76,20 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string> // note: add `query` for this value.... es_query_dsl.AddMember("query", query_node, allocator); bool pure_docvalue = true; - // check docvalue sacan optimization - if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { - pure_docvalue = false; + + // Doris FE already has checked docvalue-scan optimization + if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) { + pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str()); } else { - for (auto& select_field : fields) { - if (docvalue_context.find(select_field) == docvalue_context.end()) { - pure_docvalue = false; - break; + // check docvalue scan optimization, used for compatibility + if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { + pure_docvalue = false; + } else { + for (auto& select_field : fields) { + if (docvalue_context.find(select_field) == docvalue_context.end()) { + pure_docvalue = false; + break; + } } } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index d1a4460..efeda65 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4036,6 +4036,7 @@ public class Catalog { sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n"); sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); + sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); sb.append(")"); } else if (table.getType() == TableType.HIVE) { diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index c885fb5..19fa6f1 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -57,28 +57,51 @@ public class EsTable extends Table { public static final String TYPE = "type"; public static final String TRANSPORT = "transport"; public static final String VERSION = "version"; + public static final String DOC_VALUES_MODE = "doc_values_mode"; public static final String TRANSPORT_HTTP = "http"; public static final String TRANSPORT_THRIFT = "thrift"; public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; + public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; private String hosts; private String[] seeds; private String userName = ""; private String passwd = ""; + // index name can be specific index、wildcard matched or alias. private String indexName; + + // which type used for `indexName`, default to `_doc` private String mappingType = "_doc"; private String transport = "http"; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; private EsTablePartitions esTablePartitions; - private boolean enableDocValueScan = false; - private boolean enableKeywordSniff = true; + // Whether to enable docvalues scan optimization for fetching fields more fast, default to true + private boolean enableDocValueScan = true; + // Whether to enable sniffing keyword for filtering more reasonable, default to true + private boolean enableKeywordSniff = true; + // if the number of fields which value extracted from `doc_value` exceeding this max limitation + // would downgrade to extract value from `stored_fields` + private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + + // Solr doc_values vs stored_fields performance-smackdown indicate: + // It is possible to notice that retrieving an high number of fields leads + // to a sensible worsening of performance if DocValues are used. + // Instead, the (almost) surprising thing is that, by returning less than 20 fields, + // DocValues performs better than stored fields and the difference gets little as the number of fields returned increases. + // Asking for 9 DocValues fields and 1 stored field takes an average query time is 6.86 (more than returning 10 stored fields) + // Here we have a slightly conservative value of 20, but at the same time we also provide configurable parameters for expert-using + // @see `MAX_DOCVALUE_FIELDS` + private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20; + + // version would be used to be compatible with different ES Cluster public EsMajorVersion majorVersion = null; + // tableContext is used for being convenient to persist some configuration parameters uniformly private Map<String, String> tableContext = new HashMap<>(); // record the latest and recently exception when sync ES table metadata (mapping, shard location) @@ -104,6 +127,10 @@ public class EsTable extends Table { return esMetaStateTracker.searchContext().docValueFieldsContext(); } + public int maxDocValueFields() { + return maxDocValueFields; + } + public boolean isDocValueScanEnable() { return enableDocValueScan; } @@ -166,8 +193,6 @@ public class EsTable extends Table { + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`" + " shoud be like 'true' or 'false', value should be double quotation marks"); } - } else { - enableDocValueScan = false; } if (properties.containsKey(KEYWORD_SNIFF)) { @@ -194,6 +219,17 @@ public class EsTable extends Table { + " but value is " + transport); } } + + if (properties.containsKey(MAX_DOCVALUE_FIELDS)) { + try { + maxDocValueFields = Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim()); + if (maxDocValueFields < 0) { + maxDocValueFields = 0; + } + } catch (Exception e) { + maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + } + } tableContext.put("hosts", hosts); tableContext.put("userName", userName); tableContext.put("passwd", passwd); @@ -205,6 +241,7 @@ public class EsTable extends Table { } tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); + tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); } public TTableDescriptor toThrift() { @@ -294,6 +331,13 @@ public class EsTable extends Table { } else { enableKeywordSniff = true; } + if (tableContext.containsKey("maxDocValueFields")) { + try { + maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields")); + } catch (Exception e) { + maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + } + } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 03cfc70..36984bf 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -18,6 +18,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.EsTable; @@ -40,9 +41,6 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -50,6 +48,9 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,7 +60,7 @@ import java.util.Random; import java.util.Set; public class EsScanNode extends ScanNode { - + private static final Logger LOG = LogManager.getLogger(EsScanNode.class); private final Random random = new Random(System.currentTimeMillis()); @@ -80,10 +81,10 @@ public class EsScanNode extends ScanNode { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - + assignBackends(); } - + @Override public int getNumInstances() { return shardScanRanges.size(); @@ -93,7 +94,7 @@ public class EsScanNode extends ScanNode { public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) { return shardScanRanges; } - + @Override public void finalize(Analyzer analyzer) throws UserException { if (isFinalized) { @@ -109,6 +110,34 @@ public class EsScanNode extends ScanNode { isFinalized = true; } + /** + * return whether can use the doc_values scan + * 0 and 1 are returned to facilitate Doris BE processing + * + * @param desc the fields needs to read from ES + * @param docValueContext the mapping for docvalues fields from origin field to doc_value fields + * @return + */ + private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) { + ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots(); + List<String> selectedFields = new ArrayList<>(slotDescriptors.size()); + for (SlotDescriptor slotDescriptor : slotDescriptors) { + selectedFields.add(slotDescriptor.getColumn().getName()); + } + if (selectedFields.size() > table.maxDocValueFields()) { + return 0; + } + Set<String> docValueFields = docValueContext.keySet(); + boolean useDocValue = true; + for (String selectedField : selectedFields) { + if (!docValueFields.contains(selectedField)) { + useDocValue = false; + break; + } + } + return useDocValue ? 1 : 0; + } + @Override protected void toThrift(TPlanNode msg) { if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) { @@ -123,6 +152,7 @@ public class EsScanNode extends ScanNode { esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { esScanNode.setDocvalue_context(table.docValueContext()); + properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext()))); } if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) { esScanNode.setFields_context(table.fieldsContext()); @@ -169,9 +199,9 @@ public class EsScanNode extends ScanNode { } } if (LOG.isDebugEnabled()) { - LOG.debug("partition prune finished, unpartitioned index [{}], " - + "partitioned index [{}]", - String.join(",", unPartitionedIndices), + LOG.debug("partition prune finished, unpartitioned index [{}], " + + "partitioned index [{}]", + String.join(",", unPartitionedIndices), String.join(",", partitionedIndices)); } int beIndex = random.nextInt(backendList.size()); @@ -241,7 +271,7 @@ public class EsScanNode extends ScanNode { * if the index name is an alias or index pattern, then the es table is related * with one or more indices some indices could be pruned by using partition info * in index settings currently only support range partition setting - * + * * @param partitionInfo * @return * @throws AnalysisException @@ -254,7 +284,7 @@ public class EsScanNode extends ScanNode { switch (partitionInfo.getType()) { case RANGE: { RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; - Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false); + Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false); partitionPruner = new RangePartitionPruner(keyRangeById, rangePartitionInfo.getPartitionColumns(), columnFilters); return partitionPruner.prune(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org