This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 68b9a2936a [improvement](doe) Step1: Fe generates the DSL and is used 
to explain (#9895)
68b9a2936a is described below

commit 68b9a2936aeb749d344495f6eea4afc1d759bd28
Author: Stalary <stal...@163.com>
AuthorDate: Mon Jul 18 23:20:58 2022 +0800

    [improvement](doe) Step1: Fe generates the DSL and is used to explain 
(#9895)
    
    For the first step, I will only change FE and then change BE once I make 
sure the DSL is ok.
---
 .../java/org/apache/doris/catalog/Catalog.java     |   1 -
 .../java/org/apache/doris/catalog/EsTable.java     |  51 +----
 .../doris/external/elasticsearch/EsUrls.java       |  37 ++++
 .../doris/external/elasticsearch/EsUtil.java       | 233 +++++++++++++++++++++
 .../external/elasticsearch/PartitionPhase.java     |   6 +-
 .../external/elasticsearch/QueryBuilders.java      |  65 +++++-
 .../java/org/apache/doris/planner/EsScanNode.java  |  88 +++++---
 .../doris/external/elasticsearch/EsUtilTest.java   | 163 ++++++++++++--
 fe/pom.xml                                         |   2 +-
 9 files changed, 547 insertions(+), 99 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2b52760400..fbf341c029 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -2967,7 +2967,6 @@ public class Catalog {
             if (esTable.getMappingType() != null) {
                 sb.append("\"type\" = 
\"").append(esTable.getMappingType()).append("\",\n");
             }
-            sb.append("\"transport\" = 
\"").append(esTable.getTransport()).append("\",\n");
             sb.append("\"enable_docvalue_scan\" = 
\"").append(esTable.isDocValueScanEnable()).append("\",\n");
             sb.append("\"max_docvalue_fields\" = 
\"").append(esTable.maxDocValueFields()).append("\",\n");
             sb.append("\"enable_keyword_sniff\" = 
\"").append(esTable.isKeywordSniffEnable()).append("\",\n");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index b03ce6d0ae..ba39f2d4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -60,12 +60,15 @@ public class EsTable extends Table {
     public static final String VERSION = "version";
     public static final String DOC_VALUES_MODE = "doc_values_mode";
 
-    public static final String TRANSPORT_HTTP = "http";
     public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
     public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
     public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
     public static final String NODES_DISCOVERY = "nodes_discovery";
     public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
+    public static final String ES_DSL = "es_dsl";
+    public static final String INIT_SCROLL_URL = "init_scroll_url";
+    public static final String NEXT_SCROLL_URL = "next_scroll_url";
+    public static final String SEARCH_URL = "search_url";
 
     private static final Logger LOG = LogManager.getLogger(EsTable.class);
 
@@ -94,8 +97,6 @@ public class EsTable extends Table {
 
     // which type used for `indexName`
     private String mappingType = null;
-    // only support http
-    private String transport = "http";
     // only save the partition definition, save the partition key,
     // partition list is got from es cluster dynamically and is saved in 
esTableState
     private PartitionInfo partitionInfo;
@@ -263,7 +264,6 @@ public class EsTable extends Table {
         if (mappingType != null) {
             tableContext.put("mappingType", mappingType);
         }
-        tableContext.put("transport", transport);
         if (majorVersion != null) {
             tableContext.put("majorVersion", majorVersion.toString());
         }
@@ -296,7 +296,6 @@ public class EsTable extends Table {
             if (mappingType != null) {
                 sb.append(mappingType);
             }
-            sb.append(transport);
         } else {
             for (Map.Entry<String, String> entry : tableContext.entrySet()) {
                 sb.append(entry.getKey());
@@ -335,7 +334,6 @@ public class EsTable extends Table {
         passwd = tableContext.get("passwd");
         indexName = tableContext.get("indexName");
         mappingType = tableContext.get("mappingType");
-        transport = tableContext.get("transport");
         if (tableContext.containsKey("majorVersion")) {
             try {
                 majorVersion = 
EsMajorVersion.parse(tableContext.get("majorVersion"));
@@ -402,10 +400,6 @@ public class EsTable extends Table {
         return mappingType;
     }
 
-    public String getTransport() {
-        return transport;
-    }
-
     public PartitionInfo getPartitionInfo() {
         return partitionInfo;
     }
@@ -464,7 +458,7 @@ public class EsTable extends Table {
             JSONObject field = (JSONObject) mappingProps.get(key);
             // Complex types are not currently supported.
             if (field.containsKey("type")) {
-                Type type = toDorisType(field.get("type").toString());
+                Type type = EsUtil.toDorisType(field.get("type").toString());
                 if (!type.isInvalid()) {
                     Column column = new Column();
                     column.setName(key);
@@ -477,39 +471,4 @@ public class EsTable extends Table {
         }
         return columns;
     }
-
-    private Type toDorisType(String esType) {
-        // reference 
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
-        switch (esType) {
-            case "null":
-                return Type.NULL;
-            case "boolean":
-                return Type.BOOLEAN;
-            case "byte":
-                return Type.TINYINT;
-            case "short":
-                return Type.SMALLINT;
-            case "integer":
-                return Type.INT;
-            case "long":
-            case "unsigned_long":
-                return Type.BIGINT;
-            case "float":
-            case "half_float":
-                return Type.FLOAT;
-            case "double":
-            case "scaled_float":
-                return Type.DOUBLE;
-            case "keyword":
-            case "text":
-            case "ip":
-            case "nested":
-            case "object":
-                return Type.STRING;
-            case "date":
-                return Type.DATE;
-            default:
-                return Type.INVALID;
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java
new file mode 100644
index 0000000000..0df1effa4b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Pack url.
+ **/
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class EsUrls {
+
+    private String searchUrl;
+
+    private String initScrollUrl;
+
+    private String nextScrollUrl;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index f6800d1c3c..2b098a669a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -17,20 +17,41 @@
 
 package org.apache.doris.external.elasticsearch;
 
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
 import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LargeIntLiteral;
+import org.apache.doris.analysis.LikePredicate;
+import org.apache.doris.analysis.LikePredicate.Operator;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.thrift.TExprOpcode;
 
 import org.apache.commons.lang3.StringUtils;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Util for ES, some static method.
@@ -210,4 +231,216 @@ public class EsUtil {
             searchContext.docValueFieldsContext().put(colName, docValueField);
         }
     }
+
+    private static QueryBuilder toCompoundEsDsl(Expr expr) {
+        CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+        switch (compoundPredicate.getOp()) {
+            case AND: {
+                QueryBuilder left = toEsDsl(compoundPredicate.getChild(0));
+                QueryBuilder right = toEsDsl(compoundPredicate.getChild(1));
+                if (left != null && right != null) {
+                    return QueryBuilders.boolQuery().must(left).must(right);
+                }
+                return null;
+            }
+            case OR: {
+                QueryBuilder left = toEsDsl(compoundPredicate.getChild(0));
+                QueryBuilder right = toEsDsl(compoundPredicate.getChild(1));
+                if (left != null && right != null) {
+                    return 
QueryBuilders.boolQuery().should(left).should(right);
+                }
+                return null;
+            }
+            case NOT: {
+                QueryBuilder child = toEsDsl(compoundPredicate.getChild(0));
+                if (child != null) {
+                    return QueryBuilders.boolQuery().mustNot(child);
+                }
+                return null;
+            }
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Doris expr to es dsl.
+     **/
+    public static QueryBuilder toEsDsl(Expr expr) {
+        if (expr == null) {
+            return null;
+        }
+        // CompoundPredicate, `between` also converted to CompoundPredicate.
+        if (expr instanceof CompoundPredicate) {
+            return toCompoundEsDsl(expr);
+        }
+        TExprOpcode opCode = expr.getOpcode();
+        String column = ((SlotRef) expr.getChild(0)).getColumnName();
+        if (expr instanceof BinaryPredicate) {
+            Object value = toDorisLiteral(expr.getChild(1));
+            switch (opCode) {
+                case EQ:
+                case EQ_FOR_NULL:
+                    return QueryBuilders.termQuery(column, value);
+                case NE:
+                    return 
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(column, value));
+                case GE:
+                    return QueryBuilders.rangeQuery(column).gte(value);
+                case GT:
+                    return QueryBuilders.rangeQuery(column).gt(value);
+                case LE:
+                    return QueryBuilders.rangeQuery(column).lte(value);
+                case LT:
+                    return QueryBuilders.rangeQuery(column).lt(value);
+                default:
+                    return null;
+            }
+        }
+        if (expr instanceof IsNullPredicate) {
+            IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+            if (isNullPredicate.isNotNull()) {
+                return QueryBuilders.existsQuery(column);
+            }
+            return 
QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(column));
+        }
+        if (expr instanceof LikePredicate) {
+            LikePredicate likePredicate = (LikePredicate) expr;
+            if (likePredicate.getOp().equals(Operator.LIKE)) {
+                char[] chars = 
likePredicate.getChild(1).getStringValue().toCharArray();
+                // example of translation :
+                //      abc_123  ===> abc?123
+                //      abc%ykz  ===> abc*123
+                //      %abc123  ===> *abc123
+                //      _abc123  ===> ?abc123
+                //      \\_abc1  ===> \\_abc1
+                //      abc\\_123 ===> abc\\_123
+                //      abc\\%123 ===> abc\\%123
+                // NOTE. user must input sql like 'abc\\_123' or 'abc\\%ykz'
+                for (int i = 0; i < chars.length; i++) {
+                    if (chars[i] == '_' || chars[i] == '%') {
+                        if (i == 0) {
+                            chars[i] = (chars[i] == '_') ? '?' : '*';
+                        } else if (chars[i - 1] != '\\') {
+                            chars[i] = (chars[i] == '_') ? '?' : '*';
+                        }
+                    }
+                }
+                return QueryBuilders.wildcardQuery(column, new String(chars));
+            } else {
+                return QueryBuilders.wildcardQuery(column, 
likePredicate.getChild(1).getStringValue());
+            }
+        }
+        if (expr instanceof InPredicate) {
+            InPredicate inPredicate = (InPredicate) expr;
+            List<Object> values = 
inPredicate.getListChildren().stream().map(EsUtil::toDorisLiteral)
+                    .collect(Collectors.toList());
+            if (inPredicate.isNotIn()) {
+                return 
QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery(column, values));
+            }
+            return QueryBuilders.termsQuery(column, values);
+        }
+        if (expr instanceof FunctionCallExpr) {
+            FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
+            if ("esquery".equals(functionCallExpr.getFnName().getFunction())) {
+                String stringValue = 
functionCallExpr.getChild(1).getStringValue();
+                return new QueryBuilders.EsQueryBuilder(stringValue);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Transfer es type to doris type.
+     **/
+    public static Type toDorisType(String esType) {
+        // reference 
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
+        switch (esType) {
+            case "null":
+                return Type.NULL;
+            case "boolean":
+                return Type.BOOLEAN;
+            case "byte":
+                return Type.TINYINT;
+            case "short":
+                return Type.SMALLINT;
+            case "integer":
+                return Type.INT;
+            case "long":
+            case "unsigned_long":
+                return Type.BIGINT;
+            case "float":
+            case "half_float":
+                return Type.FLOAT;
+            case "double":
+            case "scaled_float":
+                return Type.DOUBLE;
+            case "keyword":
+            case "text":
+            case "ip":
+            case "nested":
+            case "object":
+                return Type.STRING;
+            case "date":
+                return Type.DATE;
+            default:
+                return Type.INVALID;
+        }
+    }
+
+    private static Object toDorisLiteral(Expr expr) {
+        if (!expr.isLiteral()) {
+            return null;
+        }
+        if (expr instanceof BoolLiteral) {
+            BoolLiteral boolLiteral = (BoolLiteral) expr;
+            return boolLiteral.getValue();
+        } else if (expr instanceof DateLiteral) {
+            DateLiteral dateLiteral = (DateLiteral) expr;
+            return dateLiteral.getLongValue();
+        } else if (expr instanceof DecimalLiteral) {
+            DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+            return decimalLiteral.getValue();
+        } else if (expr instanceof FloatLiteral) {
+            FloatLiteral floatLiteral = (FloatLiteral) expr;
+            return floatLiteral.getValue();
+        } else if (expr instanceof IntLiteral) {
+            IntLiteral intLiteral = (IntLiteral) expr;
+            return intLiteral.getValue();
+        } else if (expr instanceof LargeIntLiteral) {
+            LargeIntLiteral largeIntLiteral = (LargeIntLiteral) expr;
+            return largeIntLiteral.getLongValue();
+        } else if (expr instanceof StringLiteral) {
+            StringLiteral stringLiteral = (StringLiteral) expr;
+            return stringLiteral.getStringValue();
+        }
+        return null;
+    }
+
+    /**
+     * Generate url for be to query es.
+     **/
+    public static EsUrls genEsUrls(String index, String type, boolean 
docValueMode, long limit, long batchSize) {
+        String filterPath = docValueMode ? 
"filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields"
+                : 
"filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id";
+        if (limit <= 0) {
+            StringBuilder initScrollUrl = new StringBuilder();
+            StringBuilder nextScrollUrl = new StringBuilder();
+            initScrollUrl.append("/").append(index);
+            if (StringUtils.isNotBlank(type)) {
+                initScrollUrl.append("/").append(type);
+            }
+            
initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=")
+                    .append(batchSize);
+            nextScrollUrl.append("/_search/scroll?").append(filterPath);
+            return new EsUrls(null, initScrollUrl.toString(), 
nextScrollUrl.toString());
+        } else {
+            StringBuilder searchUrl = new StringBuilder();
+            searchUrl.append("/").append(index);
+            if (StringUtils.isNotBlank(type)) {
+                searchUrl.append("/").append(type);
+            }
+            
searchUrl.append("/_search?terminate_after=").append(limit).append("&").append(filterPath);
+            return new EsUrls(searchUrl.toString(), null, null);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
index bb5d416f56..f13db98fb5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import org.apache.doris.catalog.EsTable;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -53,8 +51,6 @@ public class PartitionPhase implements SearchPhase {
     @Override
     public void postProcess(SearchContext context) throws DorisEsException {
         context.partitions(shardPartitions);
-        if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) {
-            context.partitions().addHttpAddress(nodesInfo);
-        }
+        context.partitions().addHttpAddress(nodesInfo);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
index 92aea914bd..3dd800cb91 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
@@ -18,10 +18,17 @@
 package org.apache.doris.external.elasticsearch;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Objects;
 
 
@@ -163,7 +170,11 @@ public final class QueryBuilders {
     /**
      * Base class to build various ES queries
      */
-    abstract static class QueryBuilder {
+    public abstract static class QueryBuilder {
+
+        private static final Logger LOG = 
LogManager.getLogger(QueryBuilder.class);
+
+        final ObjectMapper mapper = new ObjectMapper();
 
         /**
          * Convert query to JSON format
@@ -172,19 +183,64 @@ public final class QueryBuilders {
          * @throws IOException if IO error occurred
          */
         abstract void toJson(JsonGenerator out) throws IOException;
+
+        /**
+         * Convert query to JSON format and catch error.
+         **/
+        public String toJson() {
+            StringWriter writer = new StringWriter();
+            try {
+                JsonGenerator gen = 
mapper.getFactory().createGenerator(writer);
+                this.toJson(gen);
+                gen.flush();
+                gen.close();
+            } catch (IOException e) {
+                LOG.warn("QueryBuilder toJson error", e);
+                return null;
+            }
+            return writer.toString();
+        }
+    }
+
+    /**
+     * Use for esquery, directly save value.
+     **/
+    public static class EsQueryBuilder extends QueryBuilder {
+
+        private final String value;
+
+        public EsQueryBuilder(String value) {
+            this.value = value;
+        }
+
+        @Override
+        void toJson(JsonGenerator out) throws IOException {
+            JsonNode jsonNode = mapper.readTree(value);
+            out.writeStartObject();
+            Iterator<Entry<String, JsonNode>> values = jsonNode.fields();
+            while (values.hasNext()) {
+                Entry<String, JsonNode> value = values.next();
+                out.writeFieldName(value.getKey());
+                out.writeObject(value.getValue());
+            }
+            out.writeEndObject();
+        }
     }
 
     /**
      * A Query that matches documents matching boolean combinations of other 
queries.
      */
-    static class BoolQueryBuilder extends QueryBuilder {
+    public static class BoolQueryBuilder extends QueryBuilder {
 
         private final List<QueryBuilder> mustClauses = new ArrayList<>();
         private final List<QueryBuilder> mustNotClauses = new ArrayList<>();
         private final List<QueryBuilder> filterClauses = new ArrayList<>();
         private final List<QueryBuilder> shouldClauses = new ArrayList<>();
 
-        BoolQueryBuilder must(QueryBuilder queryBuilder) {
+        /**
+         * Use for EsScanNode generate dsl.
+         **/
+        public BoolQueryBuilder must(QueryBuilder queryBuilder) {
             Objects.requireNonNull(queryBuilder);
             mustClauses.add(queryBuilder);
             return this;
@@ -221,8 +277,7 @@ public final class QueryBuilders {
             out.writeEndObject();
         }
 
-        private void writeJsonArray(String field, List<QueryBuilder> clauses, 
JsonGenerator out)
-                throws IOException {
+        private void writeJsonArray(String field, List<QueryBuilder> clauses, 
JsonGenerator out) throws IOException {
             if (clauses.isEmpty()) {
                 return;
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index f37704fd9d..fc557e7260 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Catalog;
@@ -30,6 +31,12 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.external.elasticsearch.EsShardPartitions;
 import org.apache.doris.external.elasticsearch.EsShardRouting;
 import org.apache.doris.external.elasticsearch.EsTablePartitions;
+import org.apache.doris.external.elasticsearch.EsUrls;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.external.elasticsearch.QueryBuilders;
+import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder;
+import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TEsScanNode;
@@ -47,6 +54,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import lombok.SneakyThrows;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -58,6 +66,9 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+/**
+ * ScanNode for Elasticsearch.
+ **/
 public class EsScanNode extends ScanNode {
 
     private static final Logger LOG = LogManager.getLogger(EsScanNode.class);
@@ -68,8 +79,8 @@ public class EsScanNode extends ScanNode {
     private EsTablePartitions esTablePartitions;
     private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList();
     private EsTable table;
-
-    boolean isFinalized = false;
+    private QueryBuilder queryBuilder;
+    private boolean isFinalized = false;
 
     public EsScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName) {
         super(id, desc, planNodeName, StatisticalType.ES_SCAN_NODE);
@@ -114,9 +125,8 @@ public class EsScanNode extends ScanNode {
      * return whether can use the doc_values scan
      * 0 and 1 are returned to facilitate Doris BE processing
      *
-     * @param desc            the fields needs to read from ES
+     * @param desc the fields needs to read from ES
      * @param docValueContext the mapping for docvalues fields from origin 
field to doc_value fields
-     * @return
      */
     private int useDocValueScan(TupleDescriptor desc, Map<String, String> 
docValueContext) {
         ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
@@ -138,13 +148,11 @@ public class EsScanNode extends ScanNode {
         return useDocValue ? 1 : 0;
     }
 
+    @SneakyThrows
     @Override
     protected void toThrift(TPlanNode msg) {
-        if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
-            msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
-        } else {
-            msg.node_type = TPlanNodeType.ES_SCAN_NODE;
-        }
+        buildQuery();
+        msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
         Map<String, String> properties = Maps.newHashMap();
         properties.put(EsTable.USER, table.getUserName());
         properties.put(EsTable.PASSWORD, table.getPasswd());
@@ -155,6 +163,17 @@ public class EsScanNode extends ScanNode {
             esScanNode.setDocvalueContext(table.docValueContext());
             properties.put(EsTable.DOC_VALUES_MODE, 
String.valueOf(useDocValueScan(desc, table.docValueContext())));
         }
+        properties.put(EsTable.ES_DSL, queryBuilder.toJson());
+
+        // Be use it add es host_port and shardId to query.
+        EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(), 
table.getMappingType(), table.isDocValueScanEnable(),
+                ConnectContext.get().getSessionVariable().batchSize, 
msg.limit);
+        if (esUrls.getSearchUrl() != null) {
+            properties.put(EsTable.SEARCH_URL, esUrls.getSearchUrl());
+        } else {
+            properties.put(EsTable.INIT_SCROLL_URL, esUrls.getInitScrollUrl());
+            properties.put(EsTable.NEXT_SCROLL_URL, esUrls.getNextScrollUrl());
+        }
         if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
             esScanNode.setFieldsContext(table.fieldsContext());
         }
@@ -181,8 +200,8 @@ public class EsScanNode extends ScanNode {
         // info is generated from es cluster state dynamically
         if (esTablePartitions == null) {
             if (table.getLastMetaDataSyncException() != null) {
-                throw new UserException("fetch es table [" + table.getName()
-                        + "] metadata failure: " + 
table.getLastMetaDataSyncException().getLocalizedMessage());
+                throw new UserException("fetch es table [" + table.getName() + 
"] metadata failure: "
+                        + 
table.getLastMetaDataSyncException().getLocalizedMessage());
             }
             throw new UserException("EsTable metadata has not been synced, Try 
it later");
         }
@@ -202,10 +221,8 @@ public class EsScanNode extends ScanNode {
             }
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("partition prune finished, unpartitioned index [{}], "
-                            + "partitioned index [{}]",
-                    String.join(",", unPartitionedIndices),
-                    String.join(",", partitionedIndices));
+            LOG.debug("partition prune finished, unpartitioned index [{}], " + 
"partitioned index [{}]",
+                    String.join(",", unPartitionedIndices), String.join(",", 
partitionedIndices));
         }
         int size = backendList.size();
         int beIndex = random.nextInt(size);
@@ -217,8 +234,7 @@ public class EsScanNode extends ScanNode {
                 int numBe = Math.min(3, size);
                 List<TNetworkAddress> shardAllocations = new ArrayList<>();
                 for (EsShardRouting item : shardRouting) {
-                    
shardAllocations.add(EsTable.TRANSPORT_HTTP.equals(table.getTransport())
-                            ? item.getHttpAddress() : item.getAddress());
+                    shardAllocations.add(item.getHttpAddress());
                 }
 
                 Collections.shuffle(shardAllocations, random);
@@ -279,15 +295,13 @@ public class EsScanNode extends ScanNode {
      * with one or more indices some indices could be pruned by using 
partition info
      * in index settings currently only support range partition setting
      *
-     * @param partitionInfo
-     * @return
-     * @throws AnalysisException
+     * @param partitionInfo partitionInfo
      */
     private Collection<Long> partitionPrune(PartitionInfo partitionInfo) 
throws AnalysisException {
         if (partitionInfo == null) {
             return null;
         }
-        PartitionPruner partitionPruner = null;
+        PartitionPruner partitionPruner;
         switch (partitionInfo.getType()) {
             case RANGE: {
                 RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) 
partitionInfo;
@@ -319,22 +333,40 @@ public class EsScanNode extends ScanNode {
         }
 
         if (!conjuncts.isEmpty()) {
-            output.append(prefix).append("PREDICATES: ").append(
-                    getExplainString(conjuncts)).append("\n");
+            output.append(prefix).append("PREDICATES: 
").append(getExplainString(conjuncts)).append("\n");
             // reserved for later using: LOCAL_PREDICATES is processed by 
Doris EsScanNode
             output.append(prefix).append("LOCAL_PREDICATES: ").append(" 
").append("\n");
             // reserved for later using: REMOTE_PREDICATES is processed by 
remote ES Cluster
             output.append(prefix).append("REMOTE_PREDICATES: ").append(" 
").append("\n");
-            // reserved for later using: translate predicates to ES queryDSL
-            output.append(prefix).append("ES_QUERY_DSL: ").append(" 
").append("\n");
+            buildQuery();
+            output.append(prefix).append("ES_QUERY_DSL: 
").append(queryBuilder.toJson()).append("\n");
         } else {
             output.append(prefix).append("ES_QUERY_DSL: 
").append("{\"match_all\": {}}").append("\n");
         }
         String indexName = table.getIndexName();
         String typeName = table.getMappingType();
-        output.append(prefix)
-                .append(String.format("ES index/type: %s/%s", indexName, 
typeName))
-                .append("\n");
+        output.append(prefix).append(String.format("ES index/type: %s/%s", 
indexName, typeName)).append("\n");
         return output.toString();
     }
+
+    private void buildQuery() {
+        if (conjuncts.isEmpty()) {
+            queryBuilder = QueryBuilders.matchAllQuery();
+        } else {
+            boolean hasFilter = false;
+            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+            for (Expr expr : conjuncts) {
+                QueryBuilder queryBuilder = EsUtil.toEsDsl(expr);
+                if (queryBuilder != null) {
+                    hasFilter = true;
+                    boolQueryBuilder.must(queryBuilder);
+                }
+            }
+            if (!hasFilter) {
+                queryBuilder = QueryBuilders.matchAllQuery();
+            } else {
+                queryBuilder = boolQueryBuilder;
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
index ca7608923b..93fc870db4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
@@ -17,6 +17,17 @@
 
 package org.apache.doris.external.elasticsearch;
 
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BinaryPredicate.Operator;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LikePredicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.PrimitiveType;
@@ -29,6 +40,7 @@ import org.json.simple.JSONValue;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -40,22 +52,14 @@ public class EsUtilTest extends EsTestCase {
 
     private List<Column> columns = new ArrayList<>();
 
-    private String jsonStr = "{\"settings\": {\n"
-            + "               \"index\": {\n"
-            + "                  \"bpack\": {\n"
-            + "                     \"partition\": {\n"
-            + "                        \"upperbound\": \"12\"\n"
-            + "                     }\n"
-            + "                  },\n"
-            + "                  \"number_of_shards\": \"5\",\n"
+    private String jsonStr = "{\"settings\": {\n" + "               \"index\": 
{\n" + "                  \"bpack\": {\n"
+            + "                     \"partition\": {\n" + "                    
    \"upperbound\": \"12\"\n"
+            + "                     }\n" + "                  },\n" + "        
          \"number_of_shards\": \"5\",\n"
             + "                  \"provided_name\": \"indexa\",\n"
             + "                  \"creation_date\": \"1539328532060\",\n"
             + "                  \"number_of_replicas\": \"1\",\n"
-            + "                  \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
-            + "                  \"version\": {\n"
-            + "                     \"created\": \"5050099\"\n"
-            + "                  }\n"
-            + "               }\n"
+            + "                  \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" + "  
                \"version\": {\n"
+            + "                     \"created\": \"5050099\"\n" + "            
      }\n" + "               }\n"
             + "            }}";
 
     /**
@@ -153,4 +157,137 @@ public class EsUtilTest extends EsTestCase {
         EsUtil.getJsonObject(json, 
"settings.index.bpack.partition.upperbound", 0);
     }
 
+    @Test
+    public void testBinaryPredicateConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        IntLiteral intLiteral = new IntLiteral(3);
+        Expr eqExpr = new BinaryPredicate(Operator.EQ, k1, intLiteral);
+        Expr neExpr = new BinaryPredicate(Operator.NE, k1, intLiteral);
+        Expr leExpr = new BinaryPredicate(Operator.LE, k1, intLiteral);
+        Expr geExpr = new BinaryPredicate(Operator.GE, k1, intLiteral);
+        Expr ltExpr = new BinaryPredicate(Operator.LT, k1, intLiteral);
+        Expr gtExpr = new BinaryPredicate(Operator.GT, k1, intLiteral);
+        Expr efnExpr = new BinaryPredicate(Operator.EQ_FOR_NULL, new 
SlotRef(null, "k1"), new IntLiteral(3));
+        Assert.assertEquals("{\"term\":{\"k1\":3}}", 
EsUtil.toEsDsl(eqExpr).toJson());
+        Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}", 
EsUtil.toEsDsl(neExpr).toJson());
+        Assert.assertEquals("{\"range\":{\"k1\":{\"lte\":3}}}", 
EsUtil.toEsDsl(leExpr).toJson());
+        Assert.assertEquals("{\"range\":{\"k1\":{\"gte\":3}}}", 
EsUtil.toEsDsl(geExpr).toJson());
+        Assert.assertEquals("{\"range\":{\"k1\":{\"lt\":3}}}", 
EsUtil.toEsDsl(ltExpr).toJson());
+        Assert.assertEquals("{\"range\":{\"k1\":{\"gt\":3}}}", 
EsUtil.toEsDsl(gtExpr).toJson());
+        Assert.assertEquals("{\"term\":{\"k1\":3}}", 
EsUtil.toEsDsl(efnExpr).toJson());
+    }
+
+    @Test
+    public void testCompoundPredicateConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        IntLiteral intLiteral1 = new IntLiteral(3);
+        SlotRef k2 = new SlotRef(null, "k2");
+        IntLiteral intLiteral2 = new IntLiteral(5);
+        BinaryPredicate binaryPredicate1 = new BinaryPredicate(Operator.EQ, 
k1, intLiteral1);
+        BinaryPredicate binaryPredicate2 = new BinaryPredicate(Operator.GT, 
k2, intLiteral2);
+        CompoundPredicate andPredicate = new 
CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate1,
+                binaryPredicate2);
+        CompoundPredicate orPredicate = new 
CompoundPredicate(CompoundPredicate.Operator.OR, binaryPredicate1,
+                binaryPredicate2);
+        CompoundPredicate notPredicate = new 
CompoundPredicate(CompoundPredicate.Operator.NOT, binaryPredicate1, null);
+        
Assert.assertEquals("{\"bool\":{\"must\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}",
+                EsUtil.toEsDsl(andPredicate).toJson());
+        
Assert.assertEquals("{\"bool\":{\"should\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}",
+                EsUtil.toEsDsl(orPredicate).toJson());
+        Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}", 
EsUtil.toEsDsl(notPredicate).toJson());
+    }
+
+    @Test
+    public void testIsNullPredicateConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        IsNullPredicate isNullPredicate = new IsNullPredicate(k1, false);
+        IsNullPredicate isNotNullPredicate = new IsNullPredicate(k1, true);
+        
Assert.assertEquals("{\"bool\":{\"must_not\":{\"exists\":{\"field\":\"k1\"}}}}",
+                EsUtil.toEsDsl(isNullPredicate).toJson());
+        Assert.assertEquals("{\"exists\":{\"field\":\"k1\"}}", 
EsUtil.toEsDsl(isNotNullPredicate).toJson());
+    }
+
+    @Test
+    public void testLikePredicateConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        StringLiteral stringLiteral1 = new StringLiteral("%1%");
+        StringLiteral stringLiteral2 = new StringLiteral("*1*");
+        StringLiteral stringLiteral3 = new StringLiteral("1_2");
+        LikePredicate likePredicate1 = new 
LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral1);
+        LikePredicate regexPredicate = new 
LikePredicate(LikePredicate.Operator.REGEXP, k1, stringLiteral2);
+        LikePredicate likePredicate2 = new 
LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral3);
+        Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}", 
EsUtil.toEsDsl(likePredicate1).toJson());
+        Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}", 
EsUtil.toEsDsl(regexPredicate).toJson());
+        Assert.assertEquals("{\"wildcard\":{\"k1\":\"1?2\"}}", 
EsUtil.toEsDsl(likePredicate2).toJson());
+    }
+
+    @Test
+    public void testInPredicateConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        IntLiteral intLiteral1 = new IntLiteral(3);
+        IntLiteral intLiteral2 = new IntLiteral(5);
+        List<Expr> intLiterals = new ArrayList<>();
+        intLiterals.add(intLiteral1);
+        intLiterals.add(intLiteral2);
+        InPredicate isInPredicate = new InPredicate(k1, intLiterals, false);
+        InPredicate isNotInPredicate = new InPredicate(k1, intLiterals, true);
+        Assert.assertEquals("{\"terms\":{\"k1\":[3,5]}}", 
EsUtil.toEsDsl(isInPredicate).toJson());
+        
Assert.assertEquals("{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}",
+                EsUtil.toEsDsl(isNotInPredicate).toJson());
+    }
+
+    @Test
+    public void testFunctionCallConvertEsDsl() {
+        SlotRef k1 = new SlotRef(null, "k1");
+        String str = "{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}";
+        StringLiteral stringLiteral = new StringLiteral(str);
+        List<Expr> exprs = new ArrayList<>();
+        exprs.add(k1);
+        exprs.add(stringLiteral);
+        FunctionCallExpr functionCallExpr = new FunctionCallExpr("esquery", 
exprs);
+        Assert.assertEquals(str, EsUtil.toEsDsl(functionCallExpr).toJson());
+
+        SlotRef k2 = new SlotRef(null, "k2");
+        IntLiteral intLiteral = new IntLiteral(5);
+        BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, k2, 
intLiteral);
+        CompoundPredicate compoundPredicate = new 
CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate,
+                functionCallExpr);
+        Assert.assertEquals(
+                
"{\"bool\":{\"must\":[{\"term\":{\"k2\":5}},{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}]}}",
+                EsUtil.toEsDsl(compoundPredicate).toJson());
+    }
+
+    @Test
+    public void testGenEsUrls() {
+        EsUrls typeLimit = EsUtil.genEsUrls("test", "_doc", false, 10, 1024);
+        Assertions.assertEquals(
+                
"/test/_doc/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+                typeLimit.getSearchUrl());
+        Assertions.assertNull(typeLimit.getInitScrollUrl());
+        Assertions.assertNull(typeLimit.getNextScrollUrl());
+
+        Assertions.assertEquals(
+                
"/test/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+                EsUtil.genEsUrls("test", null, false, 10, 
1024).getSearchUrl());
+
+        EsUrls typeNoLimit = EsUtil.genEsUrls("test", "_doc", false, -1, 1024);
+        Assertions.assertEquals(
+                
"/test/_doc/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=1024",
+                typeNoLimit.getInitScrollUrl());
+        
Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+                typeNoLimit.getNextScrollUrl());
+        Assertions.assertNull(typeNoLimit.getSearchUrl());
+
+        EsUrls noTypeNoLimit = EsUtil.genEsUrls("test", null, false, -1, 2048);
+        Assertions.assertEquals(
+                
"/test/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=2048",
+                noTypeNoLimit.getInitScrollUrl());
+        
Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+                noTypeNoLimit.getNextScrollUrl());
+
+        EsUrls docValueTypeLimit = EsUtil.genEsUrls("test", "_doc", true, 100, 
1024);
+        Assertions.assertEquals(
+                
"/test/_doc/_search?terminate_after=100&filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields",
+                docValueTypeLimit.getSearchUrl());
+    }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 0c3be8cca1..cb4abf93ec 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -230,7 +230,7 @@ under the License.
         <commons-collections.version>3.2.2</commons-collections.version>
         <scala.version>2.12.10</scala.version>
         <kryo.version>4.0.2</kryo.version>
-        <lombok.version>1.18.16</lombok.version>
+        <lombok.version>1.18.24</lombok.version>
         <tree-printer.version>1.2</tree-printer.version>
         <hamcrest.version>2.1</hamcrest.version>
         <httpclient.version>4.5.13</httpclient.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to