This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 68b9a2936a [improvement](doe) Step1: Fe generates the DSL and is used to explain (#9895) 68b9a2936a is described below commit 68b9a2936aeb749d344495f6eea4afc1d759bd28 Author: Stalary <stal...@163.com> AuthorDate: Mon Jul 18 23:20:58 2022 +0800 [improvement](doe) Step1: Fe generates the DSL and is used to explain (#9895) For the first step, I will only change FE and then change BE once I make sure the DSL is ok. --- .../java/org/apache/doris/catalog/Catalog.java | 1 - .../java/org/apache/doris/catalog/EsTable.java | 51 +---- .../doris/external/elasticsearch/EsUrls.java | 37 ++++ .../doris/external/elasticsearch/EsUtil.java | 233 +++++++++++++++++++++ .../external/elasticsearch/PartitionPhase.java | 6 +- .../external/elasticsearch/QueryBuilders.java | 65 +++++- .../java/org/apache/doris/planner/EsScanNode.java | 88 +++++--- .../doris/external/elasticsearch/EsUtilTest.java | 163 ++++++++++++-- fe/pom.xml | 2 +- 9 files changed, 547 insertions(+), 99 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 2b52760400..fbf341c029 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2967,7 +2967,6 @@ public class Catalog { if (esTable.getMappingType() != null) { sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); } - sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n"); sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n"); sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\",\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index b03ce6d0ae..ba39f2d4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -60,12 +60,15 @@ public class EsTable extends Table { public static final String VERSION = "version"; public static final String DOC_VALUES_MODE = "doc_values_mode"; - public static final String TRANSPORT_HTTP = "http"; public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields"; public static final String NODES_DISCOVERY = "nodes_discovery"; public static final String HTTP_SSL_ENABLED = "http_ssl_enabled"; + public static final String ES_DSL = "es_dsl"; + public static final String INIT_SCROLL_URL = "init_scroll_url"; + public static final String NEXT_SCROLL_URL = "next_scroll_url"; + public static final String SEARCH_URL = "search_url"; private static final Logger LOG = LogManager.getLogger(EsTable.class); @@ -94,8 +97,6 @@ public class EsTable extends Table { // which type used for `indexName` private String mappingType = null; - // only support http - private String transport = "http"; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState private PartitionInfo partitionInfo; @@ -263,7 +264,6 @@ public class EsTable extends Table { if (mappingType != null) { tableContext.put("mappingType", mappingType); } - tableContext.put("transport", transport); if (majorVersion != null) { tableContext.put("majorVersion", majorVersion.toString()); } @@ -296,7 +296,6 @@ public class EsTable extends Table { if (mappingType != null) { sb.append(mappingType); } - sb.append(transport); } else { for (Map.Entry<String, String> entry : tableContext.entrySet()) { sb.append(entry.getKey()); @@ -335,7 +334,6 @@ public class EsTable extends Table { passwd = tableContext.get("passwd"); indexName = tableContext.get("indexName"); mappingType = tableContext.get("mappingType"); - transport = tableContext.get("transport"); if (tableContext.containsKey("majorVersion")) { try { majorVersion = EsMajorVersion.parse(tableContext.get("majorVersion")); @@ -402,10 +400,6 @@ public class EsTable extends Table { return mappingType; } - public String getTransport() { - return transport; - } - public PartitionInfo getPartitionInfo() { return partitionInfo; } @@ -464,7 +458,7 @@ public class EsTable extends Table { JSONObject field = (JSONObject) mappingProps.get(key); // Complex types are not currently supported. if (field.containsKey("type")) { - Type type = toDorisType(field.get("type").toString()); + Type type = EsUtil.toDorisType(field.get("type").toString()); if (!type.isInvalid()) { Column column = new Column(); column.setName(key); @@ -477,39 +471,4 @@ public class EsTable extends Table { } return columns; } - - private Type toDorisType(String esType) { - // reference https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html - switch (esType) { - case "null": - return Type.NULL; - case "boolean": - return Type.BOOLEAN; - case "byte": - return Type.TINYINT; - case "short": - return Type.SMALLINT; - case "integer": - return Type.INT; - case "long": - case "unsigned_long": - return Type.BIGINT; - case "float": - case "half_float": - return Type.FLOAT; - case "double": - case "scaled_float": - return Type.DOUBLE; - case "keyword": - case "text": - case "ip": - case "nested": - case "object": - return Type.STRING; - case "date": - return Type.DATE; - default: - return Type.INVALID; - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java new file mode 100644 index 0000000000..0df1effa4b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Pack url. + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class EsUrls { + + private String searchUrl; + + private String initScrollUrl; + + private String nextScrollUrl; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index f6800d1c3c..2b098a669a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -17,20 +17,41 @@ package org.apache.doris.external.elasticsearch; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; import org.apache.doris.analysis.DistributionDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LargeIntLiteral; +import org.apache.doris.analysis.LikePredicate; +import org.apache.doris.analysis.LikePredicate.Operator; import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.RangePartitionDesc; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder; +import org.apache.doris.thrift.TExprOpcode; import org.apache.commons.lang3.StringUtils; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Util for ES, some static method. @@ -210,4 +231,216 @@ public class EsUtil { searchContext.docValueFieldsContext().put(colName, docValueField); } } + + private static QueryBuilder toCompoundEsDsl(Expr expr) { + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + switch (compoundPredicate.getOp()) { + case AND: { + QueryBuilder left = toEsDsl(compoundPredicate.getChild(0)); + QueryBuilder right = toEsDsl(compoundPredicate.getChild(1)); + if (left != null && right != null) { + return QueryBuilders.boolQuery().must(left).must(right); + } + return null; + } + case OR: { + QueryBuilder left = toEsDsl(compoundPredicate.getChild(0)); + QueryBuilder right = toEsDsl(compoundPredicate.getChild(1)); + if (left != null && right != null) { + return QueryBuilders.boolQuery().should(left).should(right); + } + return null; + } + case NOT: { + QueryBuilder child = toEsDsl(compoundPredicate.getChild(0)); + if (child != null) { + return QueryBuilders.boolQuery().mustNot(child); + } + return null; + } + default: + return null; + } + } + + /** + * Doris expr to es dsl. + **/ + public static QueryBuilder toEsDsl(Expr expr) { + if (expr == null) { + return null; + } + // CompoundPredicate, `between` also converted to CompoundPredicate. + if (expr instanceof CompoundPredicate) { + return toCompoundEsDsl(expr); + } + TExprOpcode opCode = expr.getOpcode(); + String column = ((SlotRef) expr.getChild(0)).getColumnName(); + if (expr instanceof BinaryPredicate) { + Object value = toDorisLiteral(expr.getChild(1)); + switch (opCode) { + case EQ: + case EQ_FOR_NULL: + return QueryBuilders.termQuery(column, value); + case NE: + return QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(column, value)); + case GE: + return QueryBuilders.rangeQuery(column).gte(value); + case GT: + return QueryBuilders.rangeQuery(column).gt(value); + case LE: + return QueryBuilders.rangeQuery(column).lte(value); + case LT: + return QueryBuilders.rangeQuery(column).lt(value); + default: + return null; + } + } + if (expr instanceof IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) expr; + if (isNullPredicate.isNotNull()) { + return QueryBuilders.existsQuery(column); + } + return QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(column)); + } + if (expr instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) expr; + if (likePredicate.getOp().equals(Operator.LIKE)) { + char[] chars = likePredicate.getChild(1).getStringValue().toCharArray(); + // example of translation : + // abc_123 ===> abc?123 + // abc%ykz ===> abc*123 + // %abc123 ===> *abc123 + // _abc123 ===> ?abc123 + // \\_abc1 ===> \\_abc1 + // abc\\_123 ===> abc\\_123 + // abc\\%123 ===> abc\\%123 + // NOTE. user must input sql like 'abc\\_123' or 'abc\\%ykz' + for (int i = 0; i < chars.length; i++) { + if (chars[i] == '_' || chars[i] == '%') { + if (i == 0) { + chars[i] = (chars[i] == '_') ? '?' : '*'; + } else if (chars[i - 1] != '\\') { + chars[i] = (chars[i] == '_') ? '?' : '*'; + } + } + } + return QueryBuilders.wildcardQuery(column, new String(chars)); + } else { + return QueryBuilders.wildcardQuery(column, likePredicate.getChild(1).getStringValue()); + } + } + if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + List<Object> values = inPredicate.getListChildren().stream().map(EsUtil::toDorisLiteral) + .collect(Collectors.toList()); + if (inPredicate.isNotIn()) { + return QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery(column, values)); + } + return QueryBuilders.termsQuery(column, values); + } + if (expr instanceof FunctionCallExpr) { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; + if ("esquery".equals(functionCallExpr.getFnName().getFunction())) { + String stringValue = functionCallExpr.getChild(1).getStringValue(); + return new QueryBuilders.EsQueryBuilder(stringValue); + } + } + return null; + } + + /** + * Transfer es type to doris type. + **/ + public static Type toDorisType(String esType) { + // reference https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html + switch (esType) { + case "null": + return Type.NULL; + case "boolean": + return Type.BOOLEAN; + case "byte": + return Type.TINYINT; + case "short": + return Type.SMALLINT; + case "integer": + return Type.INT; + case "long": + case "unsigned_long": + return Type.BIGINT; + case "float": + case "half_float": + return Type.FLOAT; + case "double": + case "scaled_float": + return Type.DOUBLE; + case "keyword": + case "text": + case "ip": + case "nested": + case "object": + return Type.STRING; + case "date": + return Type.DATE; + default: + return Type.INVALID; + } + } + + private static Object toDorisLiteral(Expr expr) { + if (!expr.isLiteral()) { + return null; + } + if (expr instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) expr; + return boolLiteral.getValue(); + } else if (expr instanceof DateLiteral) { + DateLiteral dateLiteral = (DateLiteral) expr; + return dateLiteral.getLongValue(); + } else if (expr instanceof DecimalLiteral) { + DecimalLiteral decimalLiteral = (DecimalLiteral) expr; + return decimalLiteral.getValue(); + } else if (expr instanceof FloatLiteral) { + FloatLiteral floatLiteral = (FloatLiteral) expr; + return floatLiteral.getValue(); + } else if (expr instanceof IntLiteral) { + IntLiteral intLiteral = (IntLiteral) expr; + return intLiteral.getValue(); + } else if (expr instanceof LargeIntLiteral) { + LargeIntLiteral largeIntLiteral = (LargeIntLiteral) expr; + return largeIntLiteral.getLongValue(); + } else if (expr instanceof StringLiteral) { + StringLiteral stringLiteral = (StringLiteral) expr; + return stringLiteral.getStringValue(); + } + return null; + } + + /** + * Generate url for be to query es. + **/ + public static EsUrls genEsUrls(String index, String type, boolean docValueMode, long limit, long batchSize) { + String filterPath = docValueMode ? "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields" + : "filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id"; + if (limit <= 0) { + StringBuilder initScrollUrl = new StringBuilder(); + StringBuilder nextScrollUrl = new StringBuilder(); + initScrollUrl.append("/").append(index); + if (StringUtils.isNotBlank(type)) { + initScrollUrl.append("/").append(type); + } + initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=") + .append(batchSize); + nextScrollUrl.append("/_search/scroll?").append(filterPath); + return new EsUrls(null, initScrollUrl.toString(), nextScrollUrl.toString()); + } else { + StringBuilder searchUrl = new StringBuilder(); + searchUrl.append("/").append(index); + if (StringUtils.isNotBlank(type)) { + searchUrl.append("/").append(type); + } + searchUrl.append("/_search?terminate_after=").append(limit).append("&").append(filterPath); + return new EsUrls(searchUrl.toString(), null, null); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java index bb5d416f56..f13db98fb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -17,8 +17,6 @@ package org.apache.doris.external.elasticsearch; -import org.apache.doris.catalog.EsTable; - import java.util.HashMap; import java.util.Map; @@ -53,8 +51,6 @@ public class PartitionPhase implements SearchPhase { @Override public void postProcess(SearchContext context) throws DorisEsException { context.partitions(shardPartitions); - if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) { - context.partitions().addHttpAddress(nodesInfo); - } + context.partitions().addHttpAddress(nodesInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java index 92aea914bd..3dd800cb91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java @@ -18,10 +18,17 @@ package org.apache.doris.external.elasticsearch; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.Objects; @@ -163,7 +170,11 @@ public final class QueryBuilders { /** * Base class to build various ES queries */ - abstract static class QueryBuilder { + public abstract static class QueryBuilder { + + private static final Logger LOG = LogManager.getLogger(QueryBuilder.class); + + final ObjectMapper mapper = new ObjectMapper(); /** * Convert query to JSON format @@ -172,19 +183,64 @@ public final class QueryBuilders { * @throws IOException if IO error occurred */ abstract void toJson(JsonGenerator out) throws IOException; + + /** + * Convert query to JSON format and catch error. + **/ + public String toJson() { + StringWriter writer = new StringWriter(); + try { + JsonGenerator gen = mapper.getFactory().createGenerator(writer); + this.toJson(gen); + gen.flush(); + gen.close(); + } catch (IOException e) { + LOG.warn("QueryBuilder toJson error", e); + return null; + } + return writer.toString(); + } + } + + /** + * Use for esquery, directly save value. + **/ + public static class EsQueryBuilder extends QueryBuilder { + + private final String value; + + public EsQueryBuilder(String value) { + this.value = value; + } + + @Override + void toJson(JsonGenerator out) throws IOException { + JsonNode jsonNode = mapper.readTree(value); + out.writeStartObject(); + Iterator<Entry<String, JsonNode>> values = jsonNode.fields(); + while (values.hasNext()) { + Entry<String, JsonNode> value = values.next(); + out.writeFieldName(value.getKey()); + out.writeObject(value.getValue()); + } + out.writeEndObject(); + } } /** * A Query that matches documents matching boolean combinations of other queries. */ - static class BoolQueryBuilder extends QueryBuilder { + public static class BoolQueryBuilder extends QueryBuilder { private final List<QueryBuilder> mustClauses = new ArrayList<>(); private final List<QueryBuilder> mustNotClauses = new ArrayList<>(); private final List<QueryBuilder> filterClauses = new ArrayList<>(); private final List<QueryBuilder> shouldClauses = new ArrayList<>(); - BoolQueryBuilder must(QueryBuilder queryBuilder) { + /** + * Use for EsScanNode generate dsl. + **/ + public BoolQueryBuilder must(QueryBuilder queryBuilder) { Objects.requireNonNull(queryBuilder); mustClauses.add(queryBuilder); return this; @@ -221,8 +277,7 @@ public final class QueryBuilders { out.writeEndObject(); } - private void writeJsonArray(String field, List<QueryBuilder> clauses, JsonGenerator out) - throws IOException { + private void writeJsonArray(String field, List<QueryBuilder> clauses, JsonGenerator out) throws IOException { if (clauses.isEmpty()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index f37704fd9d..fc557e7260 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -18,6 +18,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; @@ -30,6 +31,12 @@ import org.apache.doris.common.UserException; import org.apache.doris.external.elasticsearch.EsShardPartitions; import org.apache.doris.external.elasticsearch.EsShardRouting; import org.apache.doris.external.elasticsearch.EsTablePartitions; +import org.apache.doris.external.elasticsearch.EsUrls; +import org.apache.doris.external.elasticsearch.EsUtil; +import org.apache.doris.external.elasticsearch.QueryBuilders; +import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder; +import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TEsScanNode; @@ -47,6 +54,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import lombok.SneakyThrows; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,6 +66,9 @@ import java.util.Map; import java.util.Random; import java.util.Set; +/** + * ScanNode for Elasticsearch. + **/ public class EsScanNode extends ScanNode { private static final Logger LOG = LogManager.getLogger(EsScanNode.class); @@ -68,8 +79,8 @@ public class EsScanNode extends ScanNode { private EsTablePartitions esTablePartitions; private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList(); private EsTable table; - - boolean isFinalized = false; + private QueryBuilder queryBuilder; + private boolean isFinalized = false; public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName, StatisticalType.ES_SCAN_NODE); @@ -114,9 +125,8 @@ public class EsScanNode extends ScanNode { * return whether can use the doc_values scan * 0 and 1 are returned to facilitate Doris BE processing * - * @param desc the fields needs to read from ES + * @param desc the fields needs to read from ES * @param docValueContext the mapping for docvalues fields from origin field to doc_value fields - * @return */ private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) { ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots(); @@ -138,13 +148,11 @@ public class EsScanNode extends ScanNode { return useDocValue ? 1 : 0; } + @SneakyThrows @Override protected void toThrift(TPlanNode msg) { - if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) { - msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE; - } else { - msg.node_type = TPlanNodeType.ES_SCAN_NODE; - } + buildQuery(); + msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE; Map<String, String> properties = Maps.newHashMap(); properties.put(EsTable.USER, table.getUserName()); properties.put(EsTable.PASSWORD, table.getPasswd()); @@ -155,6 +163,17 @@ public class EsScanNode extends ScanNode { esScanNode.setDocvalueContext(table.docValueContext()); properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext()))); } + properties.put(EsTable.ES_DSL, queryBuilder.toJson()); + + // Be use it add es host_port and shardId to query. + EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), table.getMappingType(), table.isDocValueScanEnable(), + ConnectContext.get().getSessionVariable().batchSize, msg.limit); + if (esUrls.getSearchUrl() != null) { + properties.put(EsTable.SEARCH_URL, esUrls.getSearchUrl()); + } else { + properties.put(EsTable.INIT_SCROLL_URL, esUrls.getInitScrollUrl()); + properties.put(EsTable.NEXT_SCROLL_URL, esUrls.getNextScrollUrl()); + } if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) { esScanNode.setFieldsContext(table.fieldsContext()); } @@ -181,8 +200,8 @@ public class EsScanNode extends ScanNode { // info is generated from es cluster state dynamically if (esTablePartitions == null) { if (table.getLastMetaDataSyncException() != null) { - throw new UserException("fetch es table [" + table.getName() - + "] metadata failure: " + table.getLastMetaDataSyncException().getLocalizedMessage()); + throw new UserException("fetch es table [" + table.getName() + "] metadata failure: " + + table.getLastMetaDataSyncException().getLocalizedMessage()); } throw new UserException("EsTable metadata has not been synced, Try it later"); } @@ -202,10 +221,8 @@ public class EsScanNode extends ScanNode { } } if (LOG.isDebugEnabled()) { - LOG.debug("partition prune finished, unpartitioned index [{}], " - + "partitioned index [{}]", - String.join(",", unPartitionedIndices), - String.join(",", partitionedIndices)); + LOG.debug("partition prune finished, unpartitioned index [{}], " + "partitioned index [{}]", + String.join(",", unPartitionedIndices), String.join(",", partitionedIndices)); } int size = backendList.size(); int beIndex = random.nextInt(size); @@ -217,8 +234,7 @@ public class EsScanNode extends ScanNode { int numBe = Math.min(3, size); List<TNetworkAddress> shardAllocations = new ArrayList<>(); for (EsShardRouting item : shardRouting) { - shardAllocations.add(EsTable.TRANSPORT_HTTP.equals(table.getTransport()) - ? item.getHttpAddress() : item.getAddress()); + shardAllocations.add(item.getHttpAddress()); } Collections.shuffle(shardAllocations, random); @@ -279,15 +295,13 @@ public class EsScanNode extends ScanNode { * with one or more indices some indices could be pruned by using partition info * in index settings currently only support range partition setting * - * @param partitionInfo - * @return - * @throws AnalysisException + * @param partitionInfo partitionInfo */ private Collection<Long> partitionPrune(PartitionInfo partitionInfo) throws AnalysisException { if (partitionInfo == null) { return null; } - PartitionPruner partitionPruner = null; + PartitionPruner partitionPruner; switch (partitionInfo.getType()) { case RANGE: { RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; @@ -319,22 +333,40 @@ public class EsScanNode extends ScanNode { } if (!conjuncts.isEmpty()) { - output.append(prefix).append("PREDICATES: ").append( - getExplainString(conjuncts)).append("\n"); + output.append(prefix).append("PREDICATES: ").append(getExplainString(conjuncts)).append("\n"); // reserved for later using: LOCAL_PREDICATES is processed by Doris EsScanNode output.append(prefix).append("LOCAL_PREDICATES: ").append(" ").append("\n"); // reserved for later using: REMOTE_PREDICATES is processed by remote ES Cluster output.append(prefix).append("REMOTE_PREDICATES: ").append(" ").append("\n"); - // reserved for later using: translate predicates to ES queryDSL - output.append(prefix).append("ES_QUERY_DSL: ").append(" ").append("\n"); + buildQuery(); + output.append(prefix).append("ES_QUERY_DSL: ").append(queryBuilder.toJson()).append("\n"); } else { output.append(prefix).append("ES_QUERY_DSL: ").append("{\"match_all\": {}}").append("\n"); } String indexName = table.getIndexName(); String typeName = table.getMappingType(); - output.append(prefix) - .append(String.format("ES index/type: %s/%s", indexName, typeName)) - .append("\n"); + output.append(prefix).append(String.format("ES index/type: %s/%s", indexName, typeName)).append("\n"); return output.toString(); } + + private void buildQuery() { + if (conjuncts.isEmpty()) { + queryBuilder = QueryBuilders.matchAllQuery(); + } else { + boolean hasFilter = false; + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + for (Expr expr : conjuncts) { + QueryBuilder queryBuilder = EsUtil.toEsDsl(expr); + if (queryBuilder != null) { + hasFilter = true; + boolQueryBuilder.must(queryBuilder); + } + } + if (!hasFilter) { + queryBuilder = QueryBuilders.matchAllQuery(); + } else { + queryBuilder = boolQueryBuilder; + } + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java index ca7608923b..93fc870db4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java @@ -17,6 +17,17 @@ package org.apache.doris.external.elasticsearch; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LikePredicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; import org.apache.doris.catalog.PrimitiveType; @@ -29,6 +40,7 @@ import org.json.simple.JSONValue; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.ArrayList; import java.util.List; @@ -40,22 +52,14 @@ public class EsUtilTest extends EsTestCase { private List<Column> columns = new ArrayList<>(); - private String jsonStr = "{\"settings\": {\n" - + " \"index\": {\n" - + " \"bpack\": {\n" - + " \"partition\": {\n" - + " \"upperbound\": \"12\"\n" - + " }\n" - + " },\n" - + " \"number_of_shards\": \"5\",\n" + private String jsonStr = "{\"settings\": {\n" + " \"index\": {\n" + " \"bpack\": {\n" + + " \"partition\": {\n" + " \"upperbound\": \"12\"\n" + + " }\n" + " },\n" + " \"number_of_shards\": \"5\",\n" + " \"provided_name\": \"indexa\",\n" + " \"creation_date\": \"1539328532060\",\n" + " \"number_of_replicas\": \"1\",\n" - + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" - + " \"version\": {\n" - + " \"created\": \"5050099\"\n" - + " }\n" - + " }\n" + + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" + " \"version\": {\n" + + " \"created\": \"5050099\"\n" + " }\n" + " }\n" + " }}"; /** @@ -153,4 +157,137 @@ public class EsUtilTest extends EsTestCase { EsUtil.getJsonObject(json, "settings.index.bpack.partition.upperbound", 0); } + @Test + public void testBinaryPredicateConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + IntLiteral intLiteral = new IntLiteral(3); + Expr eqExpr = new BinaryPredicate(Operator.EQ, k1, intLiteral); + Expr neExpr = new BinaryPredicate(Operator.NE, k1, intLiteral); + Expr leExpr = new BinaryPredicate(Operator.LE, k1, intLiteral); + Expr geExpr = new BinaryPredicate(Operator.GE, k1, intLiteral); + Expr ltExpr = new BinaryPredicate(Operator.LT, k1, intLiteral); + Expr gtExpr = new BinaryPredicate(Operator.GT, k1, intLiteral); + Expr efnExpr = new BinaryPredicate(Operator.EQ_FOR_NULL, new SlotRef(null, "k1"), new IntLiteral(3)); + Assert.assertEquals("{\"term\":{\"k1\":3}}", EsUtil.toEsDsl(eqExpr).toJson()); + Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}", EsUtil.toEsDsl(neExpr).toJson()); + Assert.assertEquals("{\"range\":{\"k1\":{\"lte\":3}}}", EsUtil.toEsDsl(leExpr).toJson()); + Assert.assertEquals("{\"range\":{\"k1\":{\"gte\":3}}}", EsUtil.toEsDsl(geExpr).toJson()); + Assert.assertEquals("{\"range\":{\"k1\":{\"lt\":3}}}", EsUtil.toEsDsl(ltExpr).toJson()); + Assert.assertEquals("{\"range\":{\"k1\":{\"gt\":3}}}", EsUtil.toEsDsl(gtExpr).toJson()); + Assert.assertEquals("{\"term\":{\"k1\":3}}", EsUtil.toEsDsl(efnExpr).toJson()); + } + + @Test + public void testCompoundPredicateConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + IntLiteral intLiteral1 = new IntLiteral(3); + SlotRef k2 = new SlotRef(null, "k2"); + IntLiteral intLiteral2 = new IntLiteral(5); + BinaryPredicate binaryPredicate1 = new BinaryPredicate(Operator.EQ, k1, intLiteral1); + BinaryPredicate binaryPredicate2 = new BinaryPredicate(Operator.GT, k2, intLiteral2); + CompoundPredicate andPredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate1, + binaryPredicate2); + CompoundPredicate orPredicate = new CompoundPredicate(CompoundPredicate.Operator.OR, binaryPredicate1, + binaryPredicate2); + CompoundPredicate notPredicate = new CompoundPredicate(CompoundPredicate.Operator.NOT, binaryPredicate1, null); + Assert.assertEquals("{\"bool\":{\"must\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}", + EsUtil.toEsDsl(andPredicate).toJson()); + Assert.assertEquals("{\"bool\":{\"should\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}", + EsUtil.toEsDsl(orPredicate).toJson()); + Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}", EsUtil.toEsDsl(notPredicate).toJson()); + } + + @Test + public void testIsNullPredicateConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + IsNullPredicate isNullPredicate = new IsNullPredicate(k1, false); + IsNullPredicate isNotNullPredicate = new IsNullPredicate(k1, true); + Assert.assertEquals("{\"bool\":{\"must_not\":{\"exists\":{\"field\":\"k1\"}}}}", + EsUtil.toEsDsl(isNullPredicate).toJson()); + Assert.assertEquals("{\"exists\":{\"field\":\"k1\"}}", EsUtil.toEsDsl(isNotNullPredicate).toJson()); + } + + @Test + public void testLikePredicateConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + StringLiteral stringLiteral1 = new StringLiteral("%1%"); + StringLiteral stringLiteral2 = new StringLiteral("*1*"); + StringLiteral stringLiteral3 = new StringLiteral("1_2"); + LikePredicate likePredicate1 = new LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral1); + LikePredicate regexPredicate = new LikePredicate(LikePredicate.Operator.REGEXP, k1, stringLiteral2); + LikePredicate likePredicate2 = new LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral3); + Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}", EsUtil.toEsDsl(likePredicate1).toJson()); + Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}", EsUtil.toEsDsl(regexPredicate).toJson()); + Assert.assertEquals("{\"wildcard\":{\"k1\":\"1?2\"}}", EsUtil.toEsDsl(likePredicate2).toJson()); + } + + @Test + public void testInPredicateConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + IntLiteral intLiteral1 = new IntLiteral(3); + IntLiteral intLiteral2 = new IntLiteral(5); + List<Expr> intLiterals = new ArrayList<>(); + intLiterals.add(intLiteral1); + intLiterals.add(intLiteral2); + InPredicate isInPredicate = new InPredicate(k1, intLiterals, false); + InPredicate isNotInPredicate = new InPredicate(k1, intLiterals, true); + Assert.assertEquals("{\"terms\":{\"k1\":[3,5]}}", EsUtil.toEsDsl(isInPredicate).toJson()); + Assert.assertEquals("{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}", + EsUtil.toEsDsl(isNotInPredicate).toJson()); + } + + @Test + public void testFunctionCallConvertEsDsl() { + SlotRef k1 = new SlotRef(null, "k1"); + String str = "{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}"; + StringLiteral stringLiteral = new StringLiteral(str); + List<Expr> exprs = new ArrayList<>(); + exprs.add(k1); + exprs.add(stringLiteral); + FunctionCallExpr functionCallExpr = new FunctionCallExpr("esquery", exprs); + Assert.assertEquals(str, EsUtil.toEsDsl(functionCallExpr).toJson()); + + SlotRef k2 = new SlotRef(null, "k2"); + IntLiteral intLiteral = new IntLiteral(5); + BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, k2, intLiteral); + CompoundPredicate compoundPredicate = new CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate, + functionCallExpr); + Assert.assertEquals( + "{\"bool\":{\"must\":[{\"term\":{\"k2\":5}},{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}]}}", + EsUtil.toEsDsl(compoundPredicate).toJson()); + } + + @Test + public void testGenEsUrls() { + EsUrls typeLimit = EsUtil.genEsUrls("test", "_doc", false, 10, 1024); + Assertions.assertEquals( + "/test/_doc/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id", + typeLimit.getSearchUrl()); + Assertions.assertNull(typeLimit.getInitScrollUrl()); + Assertions.assertNull(typeLimit.getNextScrollUrl()); + + Assertions.assertEquals( + "/test/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id", + EsUtil.genEsUrls("test", null, false, 10, 1024).getSearchUrl()); + + EsUrls typeNoLimit = EsUtil.genEsUrls("test", "_doc", false, -1, 1024); + Assertions.assertEquals( + "/test/_doc/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=1024", + typeNoLimit.getInitScrollUrl()); + Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id", + typeNoLimit.getNextScrollUrl()); + Assertions.assertNull(typeNoLimit.getSearchUrl()); + + EsUrls noTypeNoLimit = EsUtil.genEsUrls("test", null, false, -1, 2048); + Assertions.assertEquals( + "/test/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=2048", + noTypeNoLimit.getInitScrollUrl()); + Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id", + noTypeNoLimit.getNextScrollUrl()); + + EsUrls docValueTypeLimit = EsUtil.genEsUrls("test", "_doc", true, 100, 1024); + Assertions.assertEquals( + "/test/_doc/_search?terminate_after=100&filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields", + docValueTypeLimit.getSearchUrl()); + } } diff --git a/fe/pom.xml b/fe/pom.xml index 0c3be8cca1..cb4abf93ec 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -230,7 +230,7 @@ under the License. <commons-collections.version>3.2.2</commons-collections.version> <scala.version>2.12.10</scala.version> <kryo.version>4.0.2</kryo.version> - <lombok.version>1.18.16</lombok.version> + <lombok.version>1.18.24</lombok.version> <tree-printer.version>1.2</tree-printer.version> <hamcrest.version>2.1</hamcrest.version> <httpclient.version>4.5.13</httpclient.version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org