wuyunfeng commented on a change in pull request #3454:
URL: https://github.com/apache/incubator-doris/pull/3454#discussion_r442691124



##########
File path: 
fe/src/main/java/org/apache/doris/external/elasticsearch/EsStateStore.java
##########
@@ -124,12 +126,9 @@ public void loadTableFromCatalog() {
             }
         }
     }
-
-    @VisibleForTesting
-    public EsTableState getTableState(String responseStr, EsTable esTable)
-            throws DdlException, AnalysisException, 
ExternalDataSourceException {
-        JSONObject jsonObject = new JSONObject(responseStr);
-        JSONObject nodesMap = jsonObject.getJSONObject("nodes");
+    
+    // Configure keyword and doc_values by mapping
+    public void setEsTableContext(EsFieldInfos fieldInfos, EsTable esTable) {

Review comment:
       Can we remove or move this setXXX from EsStateStore?

##########
File path: 
fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
##########
@@ -17,118 +17,109 @@
 
 package org.apache.doris.external.elasticsearch;
 
+import org.apache.doris.catalog.Column;
+import org.apache.http.HttpHeaders;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.util.Strings;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import okhttp3.Credentials;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
 
 public class EsRestClient {
+    
     private static final Logger LOG = LogManager.getLogger(EsRestClient.class);
     private ObjectMapper mapper;
-
+    
     {
         mapper = new ObjectMapper();
         mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
         mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
     }
-
+    
     private static OkHttpClient networkClient = new OkHttpClient.Builder()
             .readTimeout(10, TimeUnit.SECONDS)
             .build();
-
-    private String basicAuth;
-
-    private int nextClient = 0;
+    
+    private Request.Builder builder;
     private String[] nodes;
     private String currentNode;
-
+    private int currentNodeIndex = 0;
+    
     public EsRestClient(String[] nodes, String authUser, String authPassword) {
         this.nodes = nodes;
+        this.builder = new Request.Builder();
         if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
-            basicAuth = Credentials.basic(authUser, authPassword);
+            this.builder.addHeader(HttpHeaders.AUTHORIZATION,
+                    Credentials.basic(authUser, authPassword));
         }
-        selectNextNode();
+        this.currentNode = nodes[currentNodeIndex];
     }
-
-    private boolean selectNextNode() {
-        if (nextClient >= nodes.length) {
-            return false;
+    
+    private void selectNextNode() {
+        currentNodeIndex++;
+        // reroute, because the previously failed node may have already been 
restored
+        if (currentNodeIndex >= nodes.length) {
+            currentNodeIndex = 0;
         }
-        currentNode = nodes[nextClient++];
-        return true;
+        currentNode = nodes[currentNodeIndex];
     }
-
+    
     public Map<String, EsNodeInfo> getHttpNodes() throws Exception {
         Map<String, Map<String, Object>> nodesData = get("_nodes/http", 
"nodes");
         if (nodesData == null) {
             return Collections.emptyMap();
         }
-        Map<String, EsNodeInfo> nodes = new HashMap<>();
+        Map<String, EsNodeInfo> nodesMap = new HashMap<>();
         for (Map.Entry<String, Map<String, Object>> entry : 
nodesData.entrySet()) {
             EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
             if (node.hasHttp()) {
-                nodes.put(node.getId(), node);
+                nodesMap.put(node.getId(), node);
             }
         }
-        return nodes;
+        return nodesMap;
     }
-
-    public String getIndexMetaData(String indexName) throws Exception {
-        String path = "_cluster/state?indices=" + indexName
-                + "&metric=routing_table,nodes,metadata&expand_wildcards=open";
-        return execute(path);
-
+    
+    public EsFieldInfos getFieldInfo(String indexName, String mappingType, 
List<Column> colList) throws Exception {

Review comment:
       ```suggestion
       public EsFieldInfos getFieldInfos(String indexName, String mappingType, 
List<Column> colList) throws Exception {
   ```

##########
File path: 
fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
##########
@@ -76,66 +102,15 @@ public TNetworkAddress randomAddress(Map<String, 
EsNodeInfo> nodesInfo) {
         EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
         return nodeInfos[seed].getPublishAddress();
     }
-    
-    public static EsIndexState parseIndexStateV55(String indexName, JSONObject 
indicesRoutingMap, 
-            JSONObject nodesMap, 
-            JSONObject indicesMetaMap, PartitionInfo partitionInfo) throws 
AnalysisException {
-        EsIndexState indexState = new EsIndexState(indexName);
-        JSONObject shardRoutings = 
indicesRoutingMap.getJSONObject(indexName).getJSONObject("shards");
-        for (String shardKey : shardRoutings.keySet()) {
-            List<EsShardRouting> singleShardRouting = Lists.newArrayList();
-            JSONArray shardRouting = shardRoutings.getJSONArray(shardKey);
-            for (int i = 0; i < shardRouting.length(); ++i) {
-                JSONObject shard = shardRouting.getJSONObject(i);
-                String shardState = shard.getString("state");
-                if ("STARTED".equalsIgnoreCase(shardState)) {
-                    try {
-                        
singleShardRouting.add(EsShardRouting.parseShardRoutingV55(shardState, 
-                                shardKey, shard, nodesMap));
-                    } catch (Exception e) {
-                        LOG.info("errors while parse shard routing from json 
[{}], ignore this shard", shard.toString(), e);
-                    }
-                } 
-            }
-            if (singleShardRouting.isEmpty()) {
-                LOG.warn("could not find a healthy allocation for [{}][{}]", 
indexName, shardKey);
-            }
-            indexState.addShardRouting(Integer.valueOf(shardKey), 
singleShardRouting);
-        }
 
-        // get some meta info from es, could be used to prune index when query
-        // index.bpack.partition.upperbound: stu_age
-        if (partitionInfo != null && partitionInfo instanceof 
RangePartitionInfo) {
-            JSONObject indexMeta = indicesMetaMap.getJSONObject(indexName);
-            JSONObject partitionSetting = EsUtil.getJsonObject(indexMeta, 
"settings.index.bpack.partition", 0);
-            LOG.debug("index {} range partition setting is {}", indexName, 
-                    partitionSetting == null ? "" : 
partitionSetting.toString());
-            if (partitionSetting != null && 
partitionSetting.has("upperbound")) {
-                String upperBound = partitionSetting.getString("upperbound");
-                List<PartitionValue> upperValues = Lists.newArrayList(new 
PartitionValue(upperBound));
-                PartitionKeyDesc partitionKeyDesc = new 
PartitionKeyDesc(upperValues);
-                // use index name as partition name
-                SingleRangePartitionDesc desc = new 
SingleRangePartitionDesc(false, 
-                        indexName, partitionKeyDesc, null);
-                PartitionKey partitionKey = PartitionKey.createPartitionKey(
-                        desc.getPartitionKeyDesc().getUpperValues(), 
-                        ((RangePartitionInfo) 
partitionInfo).getPartitionColumns());
-                desc.analyze(((RangePartitionInfo) 
partitionInfo).getPartitionColumns().size(), null);
-                indexState.setPartitionDesc(desc);
-                indexState.setPartitionKey(partitionKey);
-            }
-        }
-        return indexState;
-    }
-    
     public void addShardRouting(int shardId, List<EsShardRouting> 
singleShardRouting) {
         shardRoutings.put(shardId, singleShardRouting);
     }
-    
+
     public String getIndexName() {

Review comment:
       this method maybe not needed, if not needed, can you delete this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to