This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 3ba38e3 [Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012) 3ba38e3 is described below commit 3ba38e3381d2af245f1ef6f806bec050dbe4e233 Author: Yunfeng,Wu <wuyunfen...@baidu.com> AuthorDate: Tue Jul 7 09:04:05 2020 +0800 [Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012) After PR #3454 was merged, we should refactor and reorganize some logic for long-term sustainable iteration for Doris On ES. To facilitate code review,I would divided into this work to multiple PRs (some other WIP work I also need to think carefully) This PR include: 1. introduce SearchContext for all state we needed 2. divide meta-sync logic into three phase 3. modify some logic processing 4. introduce version detect logic for future using --- .../java/org/apache/doris/catalog/EsTable.java | 74 +++------- .../external/elasticsearch/DorisEsException.java | 5 +- .../external/elasticsearch/EsMajorVersion.java | 27 +++- .../external/elasticsearch/EsMetaStateTracker.java | 57 ++++++++ .../doris/external/elasticsearch/EsNodeInfo.java | 2 +- .../doris/external/elasticsearch/EsRepository.java | 9 +- .../doris/external/elasticsearch/EsRestClient.java | 81 ++++++++--- .../external/elasticsearch/EsShardPartitions.java | 7 +- .../{EsFieldInfos.java => MappingPhase.java} | 161 +++++++++------------ .../external/elasticsearch/PartitionPhase.java | 51 +++++++ .../external/elasticsearch/SearchContext.java | 145 +++++++++++++++++++ .../{DorisEsException.java => SearchPhase.java} | 23 ++- .../doris/external/elasticsearch/VersionPhase.java | 55 +++++++ .../external/elasticsearch/EsRepositoryTest.java | 128 ---------------- .../doris/external/elasticsearch/EsUtilTest.java | 40 ++--- 15 files changed, 526 insertions(+), 339 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index 2a26d51..c885fb5 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -20,21 +20,19 @@ package org.apache.doris.catalog; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; -import org.apache.doris.external.elasticsearch.EsFieldInfos; import org.apache.doris.external.elasticsearch.EsMajorVersion; -import org.apache.doris.external.elasticsearch.EsNodeInfo; +import org.apache.doris.external.elasticsearch.EsMetaStateTracker; import org.apache.doris.external.elasticsearch.EsRestClient; -import org.apache.doris.external.elasticsearch.EsShardPartitions; import org.apache.doris.external.elasticsearch.EsTablePartitions; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.base.Strings; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.google.common.base.Strings; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -83,27 +81,6 @@ public class EsTable extends Table { private Map<String, String> tableContext = new HashMap<>(); - // used to indicate which fields can get from ES docavalue - // because elasticsearch can have "fields" feature, field can have - // two or more types, the first type maybe have not docvalue but other - // can have, such as (text field not have docvalue, but keyword can have): - // "properties": { - // "city": { - // "type": "text", - // "fields": { - // "raw": { - // "type": "keyword" - // } - // } - // } - // } - // then the docvalue context provided the mapping between the select field and real request field : - // {"city": "city.raw"} - // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` - private Map<String, String> docValueContext = new HashMap<>(); - - private Map<String, String> fieldsContext = new HashMap<>(); - // record the latest and recently exception when sync ES table metadata (mapping, shard location) private Throwable lastMetaDataSyncException = null; @@ -118,21 +95,13 @@ public class EsTable extends Table { validate(properties); } - public void addFieldInfos(EsFieldInfos esFieldInfos) { - if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) { - fieldsContext = esFieldInfos.getFieldsContext(); - } - if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) { - docValueContext = esFieldInfos.getDocValueContext(); - } - } public Map<String, String> fieldsContext() { - return fieldsContext; + return esMetaStateTracker.searchContext().fetchFieldsContext(); } public Map<String, String> docValueContext() { - return docValueContext; + return esMetaStateTracker.searchContext().docValueFieldsContext(); } public boolean isDocValueScanEnable() { @@ -179,9 +148,12 @@ public class EsTable extends Table { if (properties.containsKey(VERSION)) { try { majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim()); + if (majorVersion.before(EsMajorVersion.V_5_X)) { + throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] "); + } } catch (Exception e) { throw new DdlException("fail to parse ES major version, version= " - + properties.get(VERSION).trim() + ", shoud be like '6.5.3' "); + + properties.get(VERSION).trim() + ", should be like '6.5.3' "); } } @@ -399,6 +371,10 @@ public class EsTable extends Table { this.esTablePartitions = esTablePartitions; } + public EsMajorVersion esVersion() { + return majorVersion; + } + public Throwable getLastMetaDataSyncException() { return lastMetaDataSyncException; } @@ -407,24 +383,20 @@ public class EsTable extends Table { this.lastMetaDataSyncException = lastMetaDataSyncException; } + private EsMetaStateTracker esMetaStateTracker; + /** - * sync es index meta from remote + * sync es index meta from remote ES Cluster + * * @param client esRestClient */ - public void syncESIndexMeta(EsRestClient client) { + public void syncTableMetaData(EsRestClient client) { + if (esMetaStateTracker == null) { + esMetaStateTracker = new EsMetaStateTracker(client, this); + } try { - EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema); - EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName); - Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes(); - if (this.enableKeywordSniff || this.enableDocValueScan) { - addFieldInfos(fieldInfos); - } - - this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions); - - if (EsTable.TRANSPORT_HTTP.equals(getTransport())) { - this.esTablePartitions.addHttpAddress(nodesInfo); - } + esMetaStateTracker.run(); + this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions(); } catch (Throwable e) { LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e); this.esTablePartitions = null; diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java index c1ea1f4..dd7964d 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java @@ -17,11 +17,8 @@ package org.apache.doris.external.elasticsearch; -import org.apache.doris.common.UserException; -public class DorisEsException extends UserException { - - private static final long serialVersionUID = 7912833584319374692L; +public class DorisEsException extends RuntimeException { public DorisEsException(String msg) { super(msg); diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java index 2319f58..d16fbd8 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMajorVersion.java @@ -20,14 +20,18 @@ package org.apache.doris.external.elasticsearch; /** * Elasticsearch major version information, useful to check client's query compatibility with the Rest API. - * + * <p> * reference es-hadoop: - * */ public class EsMajorVersion { + + public static final EsMajorVersion V_0_X = new EsMajorVersion((byte) 0, "0.x"); + public static final EsMajorVersion V_1_X = new EsMajorVersion((byte) 1, "1.x"); + public static final EsMajorVersion V_2_X = new EsMajorVersion((byte) 2, "2.x"); public static final EsMajorVersion V_5_X = new EsMajorVersion((byte) 5, "5.x"); public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x"); public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x"); + public static final EsMajorVersion V_8_X = new EsMajorVersion((byte) 8, "8.x"); public static final EsMajorVersion LATEST = V_7_X; public final byte major; @@ -62,7 +66,16 @@ public class EsMajorVersion { return version.major >= major; } - public static EsMajorVersion parse(String version) throws Exception { + public static EsMajorVersion parse(String version) throws DorisEsException { + if (version.startsWith("0.")) { + return new EsMajorVersion((byte) 0, version); + } + if (version.startsWith("1.")) { + return new EsMajorVersion((byte) 1, version); + } + if (version.startsWith("2.")) { + return new EsMajorVersion((byte) 2, version); + } if (version.startsWith("5.")) { return new EsMajorVersion((byte) 5, version); } @@ -72,8 +85,12 @@ public class EsMajorVersion { if (version.startsWith("7.")) { return new EsMajorVersion((byte) 7, version); } - throw new Exception("Unsupported/Unknown Elasticsearch version [" + version + "]." + - "Highest supported version is [" + LATEST.version + "]. You may need to upgrade ES-Hadoop."); + // used for the next released ES version + if (version.startsWith("8.")) { + return new EsMajorVersion((byte) 8, version); + } + throw new DorisEsException("Unsupported/Unknown ES Cluster version [" + version + "]." + + "Highest supported version is [" + LATEST.version + "]."); } @Override diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java new file mode 100644 index 0000000..f41837f --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsMetaStateTracker.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import org.apache.doris.catalog.EsTable; + +import java.util.LinkedList; +import java.util.List; + +/** + * It is responsible for this class to schedule all network request sent to remote ES Cluster + * Request sequence + * 1. GET / + * 2. GET {index}/_mapping + * 3. GET {index}/_search_shards + * <p> + * note: step 1 is not necessary + */ +public class EsMetaStateTracker { + + private List<SearchPhase> builtinSearchPhase = new LinkedList<>(); + private SearchContext searchContext; + + public EsMetaStateTracker(EsRestClient client, EsTable esTable) { + builtinSearchPhase.add(new VersionPhase(client)); + builtinSearchPhase.add(new MappingPhase(client)); + builtinSearchPhase.add(new PartitionPhase(client)); + searchContext = new SearchContext(esTable); + } + + public SearchContext searchContext() { + return searchContext; + } + + public void run() throws DorisEsException { + for (SearchPhase searchPhase : builtinSearchPhase) { + searchPhase.preProcess(searchContext); + searchPhase.execute(searchContext); + searchPhase.postProcess(searchContext); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java index 4e11f9d..73d8daf 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsNodeInfo.java @@ -38,7 +38,7 @@ public class EsNodeInfo { private boolean hasThrift; private TNetworkAddress thriftAddress; - public EsNodeInfo(String id, Map<String, Object> map) throws Exception { + public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException { this.id = id; EsMajorVersion version = EsMajorVersion.parse((String) map.get("version")); this.name = (String) map.get("name"); diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java index 2cfcf4c..98e895e 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRepository.java @@ -17,6 +17,7 @@ package org.apache.doris.external.elasticsearch; + import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.EsTable; @@ -24,7 +25,9 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; + import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,7 +36,8 @@ import java.util.Map; /** - * It is used to call es api to get shard allocation state + * It is responsible for loading all ES external table's meta-data such as `fields`, `partitions` periodically, + * playing the `repo` role at Doris On ES */ public class EsRepository extends MasterDaemon { @@ -69,8 +73,7 @@ public class EsRepository extends MasterDaemon { protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { - EsRestClient client = esClients.get(esTable.getId()); - esTable.syncESIndexMeta(client); + esTable.syncTableMetaData(esClients.get(esTable.getId())); } catch (Throwable e) { LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e); esTable.setEsTablePartitions(null); diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index dd7f93f..f2868aa 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -17,7 +17,6 @@ package org.apache.doris.external.elasticsearch; -import org.apache.doris.catalog.Column; import org.apache.http.HttpHeaders; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,37 +25,38 @@ import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; + import java.io.IOException; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; + import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; public class EsRestClient { - + private static final Logger LOG = LogManager.getLogger(EsRestClient.class); private ObjectMapper mapper; - + { mapper = new ObjectMapper(); mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false); mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false); } - + private static OkHttpClient networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .build(); - + private Request.Builder builder; private String[] nodes; private String currentNode; private int currentNodeIndex = 0; - + public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; this.builder = new Request.Builder(); @@ -66,7 +66,7 @@ public class EsRestClient { } this.currentNode = nodes[currentNodeIndex]; } - + private void selectNextNode() { currentNodeIndex++; // reroute, because the previously failed node may have already been restored @@ -75,8 +75,8 @@ public class EsRestClient { } currentNode = nodes[currentNodeIndex]; } - - public Map<String, EsNodeInfo> getHttpNodes() throws Exception { + + public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); @@ -90,35 +90,67 @@ public class EsRestClient { } return nodesMap; } - - public EsFieldInfos getFieldInfos(String indexName, String docType, List<Column> colList) throws Exception { + + /** + * Get remote ES Cluster version + * + * @return + * @throws Exception + */ + public EsMajorVersion version() throws DorisEsException { + Map<String, Object> result = get("/", null); + if (result == null) { + throw new DorisEsException("Unable to retrieve ES main cluster info."); + } + Map<String, String> versionBody = (Map<String, String>) result.get("version"); + return EsMajorVersion.parse(versionBody.get("number")); + } + + /** + * Get mapping for indexName + * + * @param indexName + * @return + * @throws Exception + */ + public String getMapping(String indexName, boolean includeTypeName) throws DorisEsException { String path = indexName + "/_mapping"; + if (includeTypeName) { + path += "?include_type_name=true"; + } String indexMapping = execute(path); if (indexMapping == null) { - throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster"); + throw new DorisEsException("index[" + indexName + "] not found"); } - return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType); + return indexMapping; } - - public EsShardPartitions getShardPartitions(String indexName) throws Exception { + + /** + * Get Shard location + * + * @param indexName + * @return + * @throws DorisEsException + */ + public EsShardPartitions searchShards(String indexName) throws DorisEsException { String path = indexName + "/_search_shards"; String searchShards = execute(path); if (searchShards == null) { - throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster"); + throw new DorisEsException("request index [" + indexName + "] search_shards failure"); } return EsShardPartitions.findShardPartitions(indexName, searchShards); } - + /** * execute request for specific path,it will try again nodes.length times if it fails * * @param path the path must not leading with '/' * @return response */ - private String execute(String path) throws Exception { + private String execute(String path) throws DorisEsException { int retrySize = nodes.length; - Exception scratchExceptionForThrow = null; + DorisEsException scratchExceptionForThrow = null; for (int i = 0; i < retrySize; i++) { // maybe should add HTTP schema to the address // actually, at this time we can only process http protocol @@ -144,7 +176,7 @@ public class EsRestClient { } } catch (IOException e) { LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e); - scratchExceptionForThrow = e; + scratchExceptionForThrow = new DorisEsException(e.getMessage()); } finally { if (response != null) { response.close(); @@ -158,11 +190,11 @@ public class EsRestClient { } return null; } - - public <T> T get(String q, String key) throws Exception { + + public <T> T get(String q, String key) throws DorisEsException { return parseContent(execute(q), key); } - + @SuppressWarnings("unchecked") private <T> T parseContent(String response, String key) { Map<String, Object> map = Collections.emptyMap(); @@ -171,6 +203,7 @@ public class EsRestClient { map = mapper.readValue(jsonParser, Map.class); } catch (IOException ex) { LOG.error("parse es response failure: [{}]", response); + throw new DorisEsException(ex.getMessage()); } return (T) (key != null ? map.get(key) : map); } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java index 5caa6c0..41687cf 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java @@ -59,7 +59,8 @@ public class EsShardPartitions { * @return shardRoutings is used for searching */ public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException { - EsShardPartitions indexState = new EsShardPartitions(indexName); + + EsShardPartitions partitions = new EsShardPartitions(indexName); JSONObject jsonObject = new JSONObject(searchShards); JSONArray shards = jsonObject.getJSONArray("shards"); int length = shards.length(); @@ -87,9 +88,9 @@ public class EsShardPartitions { if (singleShardRouting.isEmpty()) { LOG.warn("could not find a healthy allocation for [{}][{}]", indexName, i); } - indexState.addShardRouting(i, singleShardRouting); + partitions.addShardRouting(i, singleShardRouting); } - return indexState; + return partitions; } public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) { diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java similarity index 51% rename from fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java rename to fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java index 5edb80b..b6d16e2 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/MappingPhase.java @@ -19,122 +19,94 @@ package org.apache.doris.external.elasticsearch; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; + import org.json.JSONObject; -import java.util.HashMap; + import java.util.Iterator; -import java.util.List; -import java.util.Map; /** - * It is used to hold the field information obtained from es, currently including the fields and docValue, - * it will eventually be added to the EsTable - **/ -public class EsFieldInfos { - - private static final Logger LOG = LogManager.getLogger(EsFieldInfos.class); - - // userId => userId.keyword - private Map<String, String> fieldsContext; - - // city => city.raw - private Map<String, String> docValueContext; - - public EsFieldInfos(Map<String, String> fieldsContext, Map<String, String> docValueContext) { - this.fieldsContext = fieldsContext; - this.docValueContext = docValueContext; + * Get index mapping from remote ES Cluster, and resolved `keyword` and `doc_values` field + * Later we can use it to parse all relevant indexes + */ +public class MappingPhase implements SearchPhase { + + private EsRestClient client; + + // json response for `{index}/_mapping` API + private String jsonMapping; + + private boolean includeTypeName = false; + + public MappingPhase(EsRestClient client) { + this.client = client; } - - public Map<String, String> getFieldsContext() { - return fieldsContext; + + @Override + public void preProcess(SearchContext context) { + if (context.version() != null && context.version().onOrAfter(EsMajorVersion.V_7_X)) { + includeTypeName = true; + } + } + + @Override + public void execute(SearchContext context) throws DorisEsException { + jsonMapping = client.getMapping(context.sourceIndex(), includeTypeName); } - - public Map<String, String> getDocValueContext() { - return docValueContext; + + @Override + public void postProcess(SearchContext context) { + resolveFields(context, jsonMapping); } - + + /** * Parse the required field information from the json - * @param colList table column - * @param indexName indexName(alias or really name) - * @param indexMapping the return value of _mapping - * @param docType The docType used by the index - * @return fieldsContext and docValueContext + * + * @param searchContext the current associated column searchContext + * @param indexMapping the return value of _mapping + * @return fetchFieldsContext and docValueFieldsContext * @throws Exception */ - public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws DorisEsException { + public void resolveFields(SearchContext searchContext, String indexMapping) throws DorisEsException { JSONObject jsonObject = new JSONObject(indexMapping); // the indexName use alias takes the first mapping Iterator<String> keys = jsonObject.keys(); String docKey = keys.next(); JSONObject docData = jsonObject.optJSONObject(docKey); - //{ - // "mappings": { - // "doc": { - // "dynamic": "strict", - // "properties": { - // "time": { - // "type": "long" - // }, - // "type": { - // "type": "keyword" - // }, - // "userId": { - // "type": "text", - // "fields": { - // "keyword": { - // "type": "keyword" - // } - // } - // } - // } - // } - // } - //} JSONObject mappings = docData.optJSONObject("mappings"); - JSONObject rootSchema = mappings.optJSONObject(docType); + JSONObject rootSchema = mappings.optJSONObject(searchContext.type()); JSONObject properties; - // no type in es7 + // After (include) 7.x, type was removed from ES mapping, default type is `_doc` + // https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html if (rootSchema == null) { + if (searchContext.type().equals("_doc") == false) { + throw new DorisEsException("index[" + searchContext.sourceIndex() + "]'s type must be exists, " + + " and after ES7.x type must be `_doc`, but found [" + + searchContext.type() + "], for table [" + + searchContext.esTable().getName() + "]"); + } properties = mappings.optJSONObject("properties"); } else { properties = rootSchema.optJSONObject("properties"); } if (properties == null) { - throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster"); - } - return parseProperties(colList, properties); - } - - // get fields information in properties - private static EsFieldInfos parseProperties(List<Column> colList, JSONObject properties) { - if (properties == null) { - return null; + throw new DorisEsException("index[" + searchContext.sourceIndex() + "] type[" + searchContext.type() + "] mapping not found for the ES Cluster"); } - Map<String, String> fieldsMap = new HashMap<>(); - Map<String, String> docValueMap = new HashMap<>(); - for (Column col : colList) { + for (Column col : searchContext.columns()) { String colName = col.getName(); + // if column exists in Doris Table but no found in ES's mapping, we choose to ignore this situation? if (!properties.has(colName)) { continue; } JSONObject fieldObject = properties.optJSONObject(colName); - String keywordField = getKeywordField(fieldObject, colName); - if (StringUtils.isNotEmpty(keywordField)) { - fieldsMap.put(colName, keywordField); - } - String docValueField = getDocValueField(fieldObject, colName); - if (StringUtils.isNotEmpty(docValueField)) { - docValueMap.put(colName, docValueField); - } + + resolveKeywordFields(searchContext, fieldObject, colName); + resolveDocValuesFields(searchContext, fieldObject, colName); } - return new EsFieldInfos(fieldsMap, docValueMap); } // get a field of keyword type in the fields - private static String getKeywordField(JSONObject fieldObject, String colName) { + private void resolveKeywordFields(SearchContext searchContext, JSONObject fieldObject, String colName) { String fieldType = fieldObject.optString("type"); // string-type field used keyword type to generate predicate // if text field type seen, we should use the `field` keyword type? @@ -145,15 +117,14 @@ public class EsFieldInfos { JSONObject innerTypeObject = fieldsObject.optJSONObject(key); // just for text type if ("keyword".equals(innerTypeObject.optString("type"))) { - return colName + "." + key; + searchContext.fetchFieldsContext().put(colName, colName + "." + key); } } } } - return null; } - - private static String getDocValueField(JSONObject fieldObject, String colName) { + + private void resolveDocValuesFields(SearchContext searchContext, JSONObject fieldObject, String colName) { String fieldType = fieldObject.optString("type"); String docValueField = null; if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) { @@ -175,16 +146,16 @@ public class EsFieldInfos { } } } - return docValueField; - } - // set doc_value = false manually - if (fieldObject.has("doc_values")) { - boolean docValue = fieldObject.optBoolean("doc_values"); - if (!docValue) { - return docValueField; + } else { + // set doc_value = false manually + if (fieldObject.has("doc_values")) { + boolean docValue = fieldObject.optBoolean("doc_values"); + if (!docValue) { + return; + } } + docValueField = colName; } - docValueField = colName; - return docValueField; + searchContext.docValueFieldsContext().put(colName, docValueField); } } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java new file mode 100644 index 0000000..de1bb76 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import org.apache.doris.catalog.EsTable; + +import java.util.Map; + +/** + * Fetch resolved indices's search shards from remote ES Cluster + */ +public class PartitionPhase implements SearchPhase { + + private EsRestClient client; + private EsShardPartitions shardPartitions; + private Map<String, EsNodeInfo> nodesInfo; + + public PartitionPhase(EsRestClient client) { + this.client = client; + } + + @Override + public void execute(SearchContext context) throws DorisEsException { + shardPartitions = client.searchShards(context.sourceIndex()); + nodesInfo = client.getHttpNodes(); + } + + + @Override + public void postProcess(SearchContext context) throws DorisEsException { + context.partitions(shardPartitions); + if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) { + context.partitions().addHttpAddress(nodesInfo); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java new file mode 100644 index 0000000..06b4c7d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchContext.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.EsTable; + +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * This class encapsulates the state needed to execute a query on ES table such as fields、doc_values、resolved index、 + * search shards etc. + * Since then, we would add more state or runtime information to this class such as + * query builder、slice scroll context、aggregation info etc. + **/ +public class SearchContext { + + private static final Logger LOG = LogManager.getLogger(SearchContext.class); + + // fetch string field value from not analyzed fields : userId => userId.keyword + // this is activated when `enable_keyword_sniff = true` + private Map<String, String> fetchFieldsContext = Maps.newHashMap(); + // used to indicate which fields can get from ES docavalue + // because elasticsearch can have "fields" feature, field can have + // two or more types, the first type maybe have not docvalue but other + // can have, such as (text field not have docvalue, but keyword can have): + // "properties": { + // "city": { + // "type": "text", + // "fields": { + // "raw": { + // "type": "keyword" + // } + // } + // } + // } + // then the docvalue context provided the mapping between the select field and real request field : + // {"city": "city.raw"} + // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw` + // fetch field value from doc_values, this is activated when `enable_docvalue_scan= true` + private Map<String, String> docValueFieldsContext = Maps.newHashMap(); + + // sourceIndex is the name of index when creating ES external table + private final String sourceIndex; + + // when the `sourceIndex` is `alias` or `wildcard` matched index, this maybe involved two or more indices + // `resolvedIndices` would return the matched underlying indices + private List<String> resolvedIndices = Collections.emptyList(); + + // `type` of the `sourceIndex` + private final String type; + + + private EsTable table; + + // all columns which user created for ES external table + private final List<Column> fullSchema; + + // represent `resolvedIndices`'s searchable shards + private EsShardPartitions shardPartitions; + + // the ES cluster version + private EsMajorVersion version; + + + public SearchContext(EsTable table) { + this.table = table; + fullSchema = table.getFullSchema(); + sourceIndex = table.getIndexName(); + type = table.getMappingType(); + } + + + public String sourceIndex() { + return sourceIndex; + } + + public List<String> resolvedIndices() { + return resolvedIndices; + } + + + public String type() { + return type; + } + + public List<Column> columns() { + return fullSchema; + } + + public EsTable esTable() { + return table; + } + + public Map<String, String> fetchFieldsContext() { + return fetchFieldsContext; + } + + public Map<String, String> docValueFieldsContext() { + return docValueFieldsContext; + } + + public void version(EsMajorVersion version) { + this.version = version; + } + + public EsMajorVersion version() { + return version; + } + + public void partitions(EsShardPartitions shardPartitions) { + this.shardPartitions = shardPartitions; + } + + public EsShardPartitions partitions() { + return shardPartitions; + } + + // this will be refactor soon + public EsTablePartitions tablePartitions() throws Exception { + return EsTablePartitions.fromShardPartitions(table, shardPartitions); + } +} diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java similarity index 61% copy from fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java copy to fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java index c1ea1f4..928524d 100644 --- a/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/SearchPhase.java @@ -17,13 +17,26 @@ package org.apache.doris.external.elasticsearch; -import org.apache.doris.common.UserException; -public class DorisEsException extends UserException { +/** + * Represents a phase of a ES fetch index metadata request e.g. get mapping, get shard location etc through network + */ +public interface SearchPhase { - private static final long serialVersionUID = 7912833584319374692L; + /** + * Performs pre processing of the search context before the execute. + */ + default void preProcess(SearchContext context) { + } + + /** + * Executes the search phase + */ + void execute(SearchContext context); - public DorisEsException(String msg) { - super(msg); + /** + * Performs post processing of the search context before the execute. + */ + default void postProcess(SearchContext context) { } } diff --git a/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java b/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java new file mode 100644 index 0000000..8d4e911 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/VersionPhase.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.elasticsearch; + +/** + * Request version from remote ES Cluster. If request fails, set the version with `LATEST` + */ +public class VersionPhase implements SearchPhase { + + private EsRestClient client; + + private boolean isVersionSet = false; + + + public VersionPhase(EsRestClient client) { + this.client = client; + } + + @Override + public void preProcess(SearchContext context) { + if (context.esTable().esVersion() != null) { + isVersionSet = true; + context.version(context.esTable().esVersion()); + } + } + + @Override + public void execute(SearchContext context) { + if (isVersionSet) { + return; + } + EsMajorVersion version; + try { + version = client.version(); + } catch (Throwable e) { + version = EsMajorVersion.LATEST; + } + context.version(version); + } +} diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java deleted file mode 100644 index c367ec0..0000000 --- a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java +++ /dev/null @@ -1,128 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.elasticsearch; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.CatalogTestUtil; -import org.apache.doris.catalog.EsTable; -import org.apache.doris.catalog.FakeCatalog; -import org.apache.doris.catalog.FakeEditLog; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeMetaVersion; -import org.apache.doris.meta.MetaContext; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.lang.reflect.InvocationTargetException; -import java.net.URISyntaxException; - -public class EsRepositoryTest { - - private static FakeEditLog fakeEditLog; - private static FakeCatalog fakeCatalog; - private static Catalog masterCatalog; - private static String mappingsStr = ""; - private static String es7MappingsStr = ""; - private static String searchShardsStr = ""; - private EsRepository esRepository; - private EsRestClient fakeClient; - - @BeforeClass - public static void init() throws IOException, InstantiationException, IllegalAccessException, - IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, - URISyntaxException { - fakeEditLog = new FakeEditLog(); - fakeCatalog = new FakeCatalog(); - masterCatalog = CatalogTestUtil.createTestCatalog(); - MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_40); - metaContext.setThreadLocalInfo(); - // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40); - FakeCatalog.setCatalog(masterCatalog); - mappingsStr = loadJsonFromFile("data/es/mappings.json"); - es7MappingsStr = loadJsonFromFile("data/es/es7_mappings.json"); - searchShardsStr = loadJsonFromFile("data/es/search_shards.json"); - } - - @Before - public void setUp() { - esRepository = new EsRepository(); - fakeClient = new EsRestClient(new String[]{"localhost:9200"}, null, null); - } - - @Test - public void testSetEsTableContext() throws Exception { - EsTable esTable = (EsTable) Catalog.getCurrentCatalog() - .getDb(CatalogTestUtil.testDb1) - .getTable(CatalogTestUtil.testEsTableId1); - // es5 - EsFieldInfos fieldInfos = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, esTable.getMappingType()); - esTable.addFieldInfos(fieldInfos); - assertEquals("userId.keyword", esTable.fieldsContext().get("userId")); - assertEquals("userId.keyword", esTable.docValueContext().get("userId")); - // es7 - EsFieldInfos fieldInfos7 = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), es7MappingsStr, ""); - assertEquals("userId.keyword", fieldInfos7.getFieldsContext().get("userId")); - assertEquals("userId.keyword", fieldInfos7.getDocValueContext().get("userId")); - - } - - @Test(expected = DorisEsException.class) - public void testSetErrorType() throws Exception { - EsTable esTable = (EsTable) Catalog.getCurrentCatalog() - .getDb(CatalogTestUtil.testDb1) - .getTable(CatalogTestUtil.testEsTableId1); - // error type - EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, "errorType"); - } - - @Test - public void testSetTableState() throws DorisEsException, DdlException { - EsTable esTable = (EsTable) Catalog.getCurrentCatalog() - .getDb(CatalogTestUtil.testDb1) - .getTable(CatalogTestUtil.testEsTableId1); - EsShardPartitions esShardPartitions = EsShardPartitions.findShardPartitions(esTable.getIndexName(), searchShardsStr); - EsTablePartitions esTablePartitions = EsTablePartitions.fromShardPartitions(esTable, esShardPartitions); - assertNotNull(esTablePartitions); - assertEquals(1, esTablePartitions.getUnPartitionedIndexStates().size()); - assertEquals(5, esTablePartitions.getEsShardPartitions("indexa").getShardRoutings().size()); - } - - private static String loadJsonFromFile(String fileName) throws IOException, URISyntaxException { - File file = new File(EsRepositoryTest.class.getClassLoader().getResource(fileName).toURI()); - InputStream is = new FileInputStream(file); - BufferedReader br = new BufferedReader(new InputStreamReader(is)); - StringBuilder jsonStr = new StringBuilder(); - String line = ""; - while ((line = br.readLine()) != null) { - jsonStr.append(line); - } - br.close(); - is.close(); - return jsonStr.toString(); - } -} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java index b611ae4..21d4909 100644 --- a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java +++ b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java @@ -27,38 +27,38 @@ import org.junit.Test; public class EsUtilTest { - private String jsonStr = "{\"settings\": {\n" - + " \"index\": {\n" - + " \"bpack\": {\n" - + " \"partition\": {\n" - + " \"upperbound\": \"12\"\n" - + " }\n" - + " },\n" - + " \"number_of_shards\": \"5\",\n" - + " \"provided_name\": \"indexa\",\n" - + " \"creation_date\": \"1539328532060\",\n" - + " \"number_of_replicas\": \"1\",\n" - + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" - + " \"version\": {\n" - + " \"created\": \"5050099\"\n" - + " }\n" - + " }\n" + private String jsonStr = "{\"settings\": {\n" + + " \"index\": {\n" + + " \"bpack\": {\n" + + " \"partition\": {\n" + + " \"upperbound\": \"12\"\n" + + " }\n" + + " },\n" + + " \"number_of_shards\": \"5\",\n" + + " \"provided_name\": \"indexa\",\n" + + " \"creation_date\": \"1539328532060\",\n" + + " \"number_of_replicas\": \"1\",\n" + + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" + + " \"version\": {\n" + + " \"created\": \"5050099\"\n" + + " }\n" + + " }\n" + " }}"; - + @Test public void testGetJsonObject() { JSONObject json = new JSONObject(jsonStr); JSONObject upperBoundSetting = EsUtil.getJsonObject(json, "settings.index.bpack.partition", 0); assertTrue(upperBoundSetting.has("upperbound")); assertEquals("12", upperBoundSetting.getString("upperbound")); - + JSONObject unExistKey = EsUtil.getJsonObject(json, "set", 0); assertNull(unExistKey); - + JSONObject singleKey = EsUtil.getJsonObject(json, "settings", 0); assertTrue(singleKey.has("index")); } - + @Test(expected = JSONException.class) public void testGetJsonObjectWithException() { JSONObject json = new JSONObject(jsonStr); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org