This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a467c6f [ES Connector] Add field context for string field keyword type (#3305) a467c6f is described below commit a467c6f81f5c7bd81c94401ee75ad84ab1dc4739 Author: Yunfeng,Wu <wuyunfen...@baidu.com> AuthorDate: Mon Apr 13 23:07:33 2020 +0800 [ES Connector] Add field context for string field keyword type (#3305) This PR is just a transitional way,but it is better to move the predicates transformation from Doris BE to Doris BE, in this way, Doris BE is responsible for fetching data from ES. Add a `enable_keyword_sniff ` configuration item in creating External Elasticsearch Table ,it default to true , would to sniff the `keyword` type on the `text analyzed` Field and return the `json_path` which substitute the origin col name. ``` CREATE EXTERNAL TABLE `test` ( `k1` varchar(20) COMMENT "", `create_time` datetime COMMENT "" ) ENGINE=ELASTICSEARCH PROPERTIES ( "hosts" = "http://10.74.167.16:8200", "user" = "root", "password" = "root", "index" = "test", "type" = "doc", "enable_keyword_sniff" = "true" ); ``` note: `enable_keyword_sniff` default to "true" run this SQL: ``` select * from test where k1 = "wu yun feng" ``` Output predicate DSL: ``` {"term":{"k1.keyword":"wu yun feng"}} ``` and in this PR, I remove the elasticsearch version detected logic for now this is useless, maybe future is needed. --- be/src/exec/es/es_predicate.cpp | 27 +++++++-- be/src/exec/es/es_predicate.h | 5 ++ be/src/exec/es_http_scan_node.cpp | 5 ++ be/src/exec/es_http_scan_node.h | 1 + .../java/org/apache/doris/catalog/Catalog.java | 5 +- .../java/org/apache/doris/catalog/EsTable.java | 36 +++++++++++- .../org/apache/doris/external/EsStateStore.java | 67 +++++++++++++--------- .../java/org/apache/doris/planner/EsScanNode.java | 3 + gensrc/thrift/PlanNodes.thrift | 12 ++++ 9 files changed, 126 insertions(+), 35 deletions(-) diff --git a/be/src/exec/es/es_predicate.cpp b/be/src/exec/es/es_predicate.cpp index 95c5724..c8528c8 100644 --- a/be/src/exec/es/es_predicate.cpp +++ b/be/src/exec/es/es_predicate.cpp @@ -253,9 +253,13 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { } ExtLiteral literal(expr->type().type, _context->get_value(expr, NULL)); + std::string col = slot_desc->col_name(); + if (_field_context.find(col) != _field_context.end()) { + col = _field_context[col]; + } ExtPredicate* predicate = new ExtBinaryPredicate( TExprNodeType::BINARY_PRED, - slot_desc->col_name(), + col, slot_desc->type(), op, literal); @@ -298,8 +302,12 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { } else { is_not_null = true; } + std::string col = slot_desc->col_name(); + if (_field_context.find(col) != _field_context.end()) { + col = _field_context[col]; + } // use TExprNodeType::IS_NULL_PRED for BooleanQueryBuilder translate - ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, slot_desc->col_name(), slot_desc->type(), is_not_null); + ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, col, slot_desc->type(), is_not_null); _disjuncts.push_back(predicate); return Status::OK(); } @@ -331,11 +339,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { if (type != TYPE_VARCHAR && type != TYPE_CHAR) { return Status::InternalError("build disjuncts failed: like value is not a string"); } - + std::string col = slot_desc->col_name(); + if (_field_context.find(col) != _field_context.end()) { + col = _field_context[col]; + } ExtLiteral literal(type, _context->get_value(expr, NULL)); ExtPredicate* predicate = new ExtLikePredicate( TExprNodeType::LIKE_PRED, - slot_desc->col_name(), + col, slot_desc->type(), literal); @@ -380,11 +391,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { in_pred_values.emplace_back(literal); iter->next(); } - + std::string col = slot_desc->col_name(); + if (_field_context.find(col) != _field_context.end()) { + col = _field_context[col]; + } ExtPredicate* predicate = new ExtInPredicate( TExprNodeType::IN_PRED, pred->is_not_in(), - slot_desc->col_name(), + col, slot_desc->type(), in_pred_values); _disjuncts.push_back(predicate); @@ -399,6 +413,7 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { std::vector<EsPredicate*> conjuncts; for (int i = 0; i < conjunct->get_num_children(); ++i) { EsPredicate *predicate = _pool->add(new EsPredicate(_context, _tuple_desc, _pool)); + predicate->set_field_context(_field_context); Status status = predicate->build_disjuncts_list(conjunct->children()[i]); if (status.ok()) { conjuncts.push_back(predicate); diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h index 6e1958b..3618873 100644 --- a/be/src/exec/es/es_predicate.h +++ b/be/src/exec/es/es_predicate.h @@ -198,6 +198,10 @@ public: return _es_query_status; } + void set_field_context(const std::map<std::string, std::string>& field_context) { + _field_context = field_context; + } + private: Status build_disjuncts_list(const Expr* conjunct); bool is_match_func(const Expr* conjunct); @@ -209,6 +213,7 @@ private: std::vector<ExtPredicate*> _disjuncts; Status _es_query_status; ObjectPool *_pool; + std::map<std::string, std::string> _field_context; }; } diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp index c6c634b..cb5eeb2 100644 --- a/be/src/exec/es_http_scan_node.cpp +++ b/be/src/exec/es_http_scan_node.cpp @@ -59,6 +59,10 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.es_scan_node.__isset.docvalue_context) { _docvalue_context = tnode.es_scan_node.docvalue_context; } + + if (tnode.es_scan_node.__isset.fields_context) { + _fields_context = tnode.es_scan_node.fields_context; + } return Status::OK(); } @@ -93,6 +97,7 @@ Status EsHttpScanNode::build_conjuncts_list() { for (int i = 0; i < _conjunct_ctxs.size(); ++i) { EsPredicate* predicate = _pool->add( new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool)); + predicate->set_field_context(_fields_context); status = predicate->build_disjuncts_list(); if (status.ok()) { _predicates.push_back(predicate); diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h index 9990a76..89cca57 100644 --- a/be/src/exec/es_http_scan_node.h +++ b/be/src/exec/es_http_scan_node.h @@ -100,6 +100,7 @@ private: std::vector<std::promise<Status>> _scanners_status; std::map<std::string, std::string> _properties; std::map<std::string, std::string> _docvalue_context; + std::map<std::string, std::string> _fields_context; std::vector<TScanRangeParams> _scan_ranges; std::vector<std::string> _column_names; diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 38cd584..928b30b 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3997,8 +3997,9 @@ public class Catalog { sb.append("\"password\" = \"").append(hidePassword ? "" : esTable.getPasswd()).append("\",\n"); sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n"); sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n"); - sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\"\n"); - sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\"\n"); + sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n"); + sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n"); + sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n"); sb.append(")"); } sb.append(";"); diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index 2045d6c..252d962 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -56,6 +56,7 @@ public class EsTable extends Table { public static final String TRANSPORT_HTTP = "http"; public static final String TRANSPORT_THRIFT = "thrift"; public static final String DOC_VALUE_SCAN = "enable_docvalue_scan"; + public static final String KEYWORD_SNIFF = "enable_keyword_sniff"; private String hosts; private String[] seeds; @@ -69,6 +70,7 @@ public class EsTable extends Table { private PartitionInfo partitionInfo; private EsTableState esTableState; private boolean enableDocValueScan = false; + private boolean enableKeywordSniff = true; public EsMajorVersion majorVersion = null; @@ -93,6 +95,8 @@ public class EsTable extends Table { // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` private Map<String, String> docValueContext = new HashMap<>(); + private Map<String, String> fieldsContext = new HashMap<>(); + public EsTable() { super(TableType.ELASTICSEARCH); } @@ -104,6 +108,13 @@ public class EsTable extends Table { validate(properties); } + public void addFetchField(String originName, String replaceName) { + fieldsContext.put(originName, replaceName); + } + + public Map<String, String> fieldsContext() { + return fieldsContext; + } public void addDocValueField(String name, String fieldsName) { docValueContext.put(name, fieldsName); @@ -117,6 +128,10 @@ public class EsTable extends Table { return enableDocValueScan; } + public boolean isKeywordSniffEnable() { + return enableKeywordSniff; + } + private void validate(Map<String, String> properties) throws DdlException { if (properties == null) { @@ -159,7 +174,7 @@ public class EsTable extends Table { } } - // Explicit setting for cluster version to avoid detecting version failure + // enable doc value scan for Elasticsearch if (properties.containsKey(DOC_VALUE_SCAN)) { try { enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim()); @@ -172,6 +187,18 @@ public class EsTable extends Table { enableDocValueScan = false; } + if (properties.containsKey(KEYWORD_SNIFF)) { + try { + enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim()); + } catch (Exception e) { + throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= " + + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`" + + " shoud be like 'true' or 'false', value should be double quotation marks"); + } + } else { + enableKeywordSniff = true; + } + if (!Strings.isNullOrEmpty(properties.get(TYPE)) && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) { mappingType = properties.get(TYPE).trim(); @@ -194,6 +221,7 @@ public class EsTable extends Table { tableContext.put("majorVersion", majorVersion.toString()); } tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); + tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); } public TTableDescriptor toThrift() { @@ -278,6 +306,11 @@ public class EsTable extends Table { } enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan")); + if (tableContext.containsKey("enableKeywordSniff")) { + enableKeywordSniff = Boolean.parseBoolean(tableContext.get("enableKeywordSniff")); + } else { + enableKeywordSniff = true; + } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { @@ -311,6 +344,7 @@ public class EsTable extends Table { tableContext.put("mappingType", mappingType); tableContext.put("transport", transport); tableContext.put("enableDocValueScan", "false"); + tableContext.put(KEYWORD_SNIFF, "true"); } } diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index c121c48..0ac05f2 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -90,9 +90,6 @@ public class EsStateStore extends MasterDaemon { esTable.getUserName(), esTable.getPasswd()); // if user not specify the es version, try to get the remote cluster versoin // in the future, we maybe need this version - if (esTable.majorVersion == null) { - esTable.majorVersion = client.version(); - } String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); if (indexMetaData == null) { continue; @@ -197,7 +194,7 @@ public class EsStateStore extends MasterDaemon { // {"city": "city.raw"} JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices"); JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName()); - if (esTable.isDocValueScanEnable() && indexMetaMap != null) { + if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) { JSONObject mappings = indexMetaMap.optJSONObject("mappings"); JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType()); JSONObject schema = rootSchema.optJSONObject("properties"); @@ -209,36 +206,54 @@ public class EsStateStore extends MasterDaemon { } JSONObject fieldObject = schema.optJSONObject(colName); String fieldType = fieldObject.optString("type"); - if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { - JSONObject fieldsObject = fieldObject.optJSONObject("fields"); - if (fieldsObject != null) { - for (String key : fieldsObject.keySet()) { - JSONObject innerTypeObject = fieldsObject.optJSONObject(key); - if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) { - continue; - } - if (innerTypeObject.has("doc_values")) { - boolean docValue = innerTypeObject.getBoolean("doc_values"); - if (docValue) { - esTable.addDocValueField(colName, colName); + // string-type field used keyword type to generate predicate + if (esTable.isKeywordSniffEnable()) { + // if text field type seen, we should use the `field` keyword type? + if ("text".equals(fieldType)) { + JSONObject fieldsObject = fieldObject.optJSONObject("fields"); + if (fieldsObject != null) { + for (String key : fieldsObject.keySet()) { + JSONObject innerTypeObject = fieldsObject.optJSONObject(key); + // just for text type + if ("keyword".equals(innerTypeObject.optString("type"))) { + esTable.addFetchField(colName, colName + "." + key); } - } else { - // a : {c : {}} -> a -> a.c - esTable.addDocValueField(colName, colName + "." + key); } } } - // skip this field - continue; } - // set doc_value = false manually - if (fieldObject.has("doc_values")) { - boolean docValue = fieldObject.optBoolean("doc_values"); - if (!docValue) { + if (esTable.isDocValueScanEnable()) { + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { + JSONObject fieldsObject = fieldObject.optJSONObject("fields"); + if (fieldsObject != null) { + for (String key : fieldsObject.keySet()) { + JSONObject innerTypeObject = fieldsObject.optJSONObject(key); + if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) { + continue; + } + if (innerTypeObject.has("doc_values")) { + boolean docValue = innerTypeObject.getBoolean("doc_values"); + if (docValue) { + esTable.addDocValueField(colName, colName); + } + } else { + // a : {c : {}} -> a -> a.c + esTable.addDocValueField(colName, colName + "." + key); + } + } + } + // skip this field continue; } + // set doc_value = false manually + if (fieldObject.has("doc_values")) { + boolean docValue = fieldObject.optBoolean("doc_values"); + if (!docValue) { + continue; + } + } + esTable.addDocValueField(colName, colName); } - esTable.addDocValueField(colName, colName); } } diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 8c0e5bb..10e878d 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -123,6 +123,9 @@ public class EsScanNode extends ScanNode { if (table.isDocValueScanEnable()) { esScanNode.setDocvalue_context(table.docValueContext()); } + if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) { + esScanNode.setFields_context(table.fieldsContext()); + } msg.es_scan_node = esScanNode; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ef51eb9..5f04435 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -222,6 +222,18 @@ struct TEsScanNode { // {"city": "city.raw"} // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` 3: optional map<string, string> docvalue_context + // used to indicate which string-type field predicate should used xxx.keyword etc. + // "k1": { + // "type": "text", + // "fields": { + // "keyword": { + // "type": "keyword", + // "ignore_above": 256 + // } + // } + // } + // k1 > 'abc' -> k1.keyword > 'abc' + 4: optional map<string, string> fields_context } struct TMiniLoadEtlFunction { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org