wuyunfeng commented on a change in pull request #3454: URL: https://github.com/apache/incubator-doris/pull/3454#discussion_r421200957
########## File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java ########## @@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) { EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); return nodeInfos[seed].getPublishAddress(); } - - public static EsIndexState parseIndexStateV55(String indexName, JSONObject indicesRoutingMap, - JSONObject nodesMap, - JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws AnalysisException { + + public static EsIndexState parseIndexState(String indexName, JSONObject nodesMap, Review comment: EsIndexState maybe need a better name ########## File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java ########## @@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) { EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); return nodeInfos[seed].getPublishAddress(); } - - public static EsIndexState parseIndexStateV55(String indexName, JSONObject indicesRoutingMap, - JSONObject nodesMap, - JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws AnalysisException { + + public static EsIndexState parseIndexState(String indexName, JSONObject nodesMap, Review comment: Can you move this parse method into RestClient? such as `getHttpNodes` ########## File path: fe/src/main/java/org/apache/doris/external/EsIndexState.java ########## @@ -76,66 +68,45 @@ public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) { EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray(); return nodeInfos[seed].getPublishAddress(); } - - public static EsIndexState parseIndexStateV55(String indexName, JSONObject indicesRoutingMap, - JSONObject nodesMap, - JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws AnalysisException { + + public static EsIndexState parseIndexState(String indexName, JSONObject nodesMap, + JSONArray shards) { EsIndexState indexState = new EsIndexState(indexName); - JSONObject shardRoutings = indicesRoutingMap.getJSONObject(indexName).getJSONObject("shards"); - for (String shardKey : shardRoutings.keySet()) { + int length = shards.length(); + for (int i = 0; i < length; i++) { List<EsShardRouting> singleShardRouting = Lists.newArrayList(); - JSONArray shardRouting = shardRoutings.getJSONArray(shardKey); - for (int i = 0; i < shardRouting.length(); ++i) { - JSONObject shard = shardRouting.getJSONObject(i); + JSONArray shardsArray = shards.getJSONArray(i); + int arrayLength = shardsArray.length(); + for (int j = 0; j < arrayLength; j++) { + JSONObject shard = shardsArray.getJSONObject(j); String shardState = shard.getString("state"); if ("STARTED".equalsIgnoreCase(shardState)) { Review comment: take the `relocating` state into account maybe as well? ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -49,48 +49,52 @@ .readTimeout(10, TimeUnit.SECONDS) .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; } - public String getIndexMetaData(String indexName) { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; + public String getIndexMapping(String indexName) { Review comment: it is better to return the `top` needed, such as above `Map<String, EsNodeInfo> getHttpNodes()` ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -49,48 +49,52 @@ .readTimeout(10, TimeUnit.SECONDS) .build(); - private String basicAuth; - - private int nextClient = 0; + private Request.Builder builder; private String[] nodes; private String currentNode; + private int currentNodeIndex = 0; public EsRestClient(String[] nodes, String authUser, String authPassword) { this.nodes = nodes; + this.builder = new Request.Builder(); if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) { - basicAuth = Credentials.basic(authUser, authPassword); + this.builder.addHeader(HttpHeaders.AUTHORIZATION, Credentials.basic(authUser, authPassword)); } - selectNextNode(); + this.currentNode = nodes[currentNodeIndex]; } - private boolean selectNextNode() { - if (nextClient >= nodes.length) { - return false; + private void selectNextNode() { + currentNodeIndex++; + // reroute, because the previously failed node may have already been restored + if (currentNodeIndex >= nodes.length) { + currentNodeIndex = 0; } - currentNode = nodes[nextClient++]; - return true; + currentNode = nodes[currentNodeIndex]; } public Map<String, EsNodeInfo> getHttpNodes() throws Exception { Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes"); if (nodesData == null) { return Collections.emptyMap(); } - Map<String, EsNodeInfo> nodes = new HashMap<>(); + Map<String, EsNodeInfo> nodesMap = new HashMap<>(); for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) { EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue()); if (node.hasHttp()) { - nodes.put(node.getId(), node); + nodesMap.put(node.getId(), node); } } - return nodes; + return nodesMap; } - public String getIndexMetaData(String indexName) { - String path = "_cluster/state?indices=" + indexName - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; + public String getIndexMapping(String indexName) { + String path = indexName + "/_mapping"; return execute(path); + } + public String getSearchShards(String indexName) { Review comment: same above ########## File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java ########## @@ -81,30 +68,34 @@ public void deRegisterTable(long tableId) { esTables.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - + @Override protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { EsRestClient client = new EsRestClient(esTable.getSeeds(), - esTable.getUserName(), esTable.getPasswd()); - // if user not specify the es version, try to get the remote cluster versoin - // in the future, we maybe need this version - String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); - if (indexMetaData == null) { + esTable.getUserName(), esTable.getPasswd()); + + String indexMapping = client.getIndexMapping(esTable.getIndexName()); + if (indexMapping == null) { continue; } - EsTableState esTableState = parseClusterState55(indexMetaData, esTable); + loadEsIndexMapping(indexMapping, esTable); + + String shardLocation = client.getSearchShards(esTable.getIndexName()); + EsTableState esTableState = loadEsSearchShards(shardLocation, esTable); if (esTableState == null) { continue; } + if (EsTable.TRANSPORT_HTTP.equals(esTable.getTransport())) { Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes(); esTableState.addHttpAddress(nodesInfo); } esTable.setEsTableState(esTableState); Review comment: maybe we should rename such `XXXTableState` method name? ########## File path: fe/src/main/java/org/apache/doris/external/EsRestClient.java ########## @@ -137,10 +135,9 @@ private String execute(String path) { if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) { currentNode = "http://" + currentNode; } - Request request = builder.get() - .url(currentNode + "/" + path) - .build(); + .url(currentNode + "/" + path) + .build(); LOG.trace("es rest client request URL: {}", currentNode + "/" + path); Review comment: ```suggestion if (LOG.isTraceEnabled) { LOG.trace("es rest client request URL: {}", currentNode + "/" + path); } ``` ########## File path: fe/src/main/java/org/apache/doris/external/EsStateStore.java ########## @@ -130,74 +121,30 @@ public void loadTableFromCatalog() { } } - private EsTableState loadEsIndexMetadataV55(final EsTable esTable) { - OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder(); - clientBuilder.authenticator(new Authenticator() { - @Override - public Request authenticate(Route route, Response response) throws IOException { - String credential = Credentials.basic(esTable.getUserName(), esTable.getPasswd()); - return response.request().newBuilder().header("Authorization", credential).build(); - } - }); - String[] seeds = esTable.getSeeds(); - for (String seed : seeds) { - String url = seed + "/_cluster/state?indices=" - + esTable.getIndexName() - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - String basicAuth = ""; - try { - Request request = new Request.Builder() - .get() - .url(url) - .addHeader("Authorization", basicAuth) - .build(); - Call call = clientBuilder.build().newCall(request); - Response response = call.execute(); - String responseStr = response.body().string(); - if (response.isSuccessful()) { - try { - EsTableState esTableState = parseClusterState55(responseStr, esTable); - if (esTableState != null) { - return esTableState; - } - } catch (Exception e) { - LOG.warn("errors while parse response msg {}", responseStr, e); - } - } else { - LOG.info("errors while call es [{}] to get state info {}", url, responseStr); - } - } catch (Exception e) { - LOG.warn("errors while call es [{}]", url, e); - } - } - return null; - } - - @VisibleForTesting - public EsTableState parseClusterState55(String responseStr, EsTable esTable) - throws DdlException, AnalysisException, ExternalDataSourceException { - JSONObject jsonObject = new JSONObject(responseStr); - String clusterName = jsonObject.getString("cluster_name"); - JSONObject nodesMap = jsonObject.getJSONObject("nodes"); - // we build the doc value context for fields maybe used for scanning - // "properties": { - // "city": { - // "type": "text", // text field does not have docvalue - // "fields": { - // "raw": { - // "type": "keyword" - // } - // } - // } - // } - // then the docvalue context provided the mapping between the select field and real request field : - // {"city": "city.raw"} - JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices"); - JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName()); - if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) { - JSONObject mappings = indexMetaMap.optJSONObject("mappings"); + // Configure keyword and doc_values by mapping + public void loadEsIndexMapping(String indexMapping, EsTable esTable) { + JSONObject jsonObject = new JSONObject(indexMapping); Review comment: Can we move all `parse json` into one location? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org