This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a06625ee6b6 [feature](ES Catalog)Add FE open API for ES Catalog 
(#40444)
a06625ee6b6 is described below

commit a06625ee6b626ce95d9e665f8cdd279ea153096b
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Mon Sep 9 10:20:38 2024 +0800

    [feature](ES Catalog)Add FE open API for ES Catalog (#40444)
    
    Add api to get info from Elasticsearch, it's very useful for debugging.
    1. get index mapping api
    ```
    GET /rest/v2/api/es_catalog/get_mapping?catalog=xxx&table=xxx
    ```
    
    2. search index api
    ```
    POST /rest/v2/api/es_catalog/search?catalog=xxx&table=xxx
    {
        request_body
        ...
    }
    ```
---
 .../apache/doris/datasource/es/EsRestClient.java   |  36 ++++++-
 .../doris/httpv2/restv2/ESCatalogAction.java       | 108 +++++++++++++++++++++
 .../plugins/plugin_curl_requester.groovy           |  16 +--
 .../es/test_es_catalog_http_open_api.groovy        |  84 ++++++++++++++++
 4 files changed, 233 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java
index 21a85126c9e..e2c7fa0b688 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java
@@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableList;
 import okhttp3.Credentials;
+import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
+import okhttp3.RequestBody;
 import okhttp3.Response;
 import org.apache.http.HttpHeaders;
 import org.apache.logging.log4j.LogManager;
@@ -126,12 +128,27 @@ public class EsRestClient {
         return indexMapping;
     }
 
+    /**
+     * Search specific index
+     */
+    public String searchIndex(String indexName, String body) throws 
DorisEsException {
+        String path = indexName + "/_search";
+        RequestBody requestBody = null;
+        if (Strings.isNotEmpty(body)) {
+            requestBody = RequestBody.create(
+                body,
+                MediaType.get("application/json")
+            );
+        }
+        return executeWithRequestBody(path, requestBody);
+    }
+
     /**
      * Check whether index exist.
      **/
     public boolean existIndex(OkHttpClient httpClient, String indexName) {
         String path = indexName + "/_mapping";
-        try (Response response = executeResponse(httpClient, path)) {
+        try (Response response = executeResponse(httpClient, path, null)) {
             if (response.isSuccessful()) {
                 return true;
             }
@@ -228,7 +245,7 @@ public class EsRestClient {
         return sslNetworkClient;
     }
 
-    private Response executeResponse(OkHttpClient httpClient, String path) 
throws IOException {
+    private Response executeResponse(OkHttpClient httpClient, String path, 
RequestBody requestBody) throws IOException {
         currentNode = currentNode.trim();
         if (!(currentNode.startsWith("http://";) || 
currentNode.startsWith("https://";))) {
             currentNode = "http://"; + currentNode;
@@ -239,7 +256,12 @@ public class EsRestClient {
         String url = currentNode + path;
         try {
             SecurityChecker.getInstance().startSSRFChecking(url);
-            Request request = builder.get().url(currentNode + path).build();
+            Request request;
+            if (requestBody != null) {
+                request = builder.post(requestBody).url(currentNode + 
path).build();
+            } else {
+                request = builder.get().url(currentNode + path).build();
+            }
             if (LOG.isInfoEnabled()) {
                 LOG.info("es rest client request URL: {}", 
request.url().toString());
             }
@@ -251,13 +273,17 @@ public class EsRestClient {
         }
     }
 
+    private String execute(String path) throws DorisEsException {
+        return executeWithRequestBody(path, null);
+    }
+
     /**
      * execute request for specific path,it will try again nodes.length times 
if it fails
      *
      * @param path the path must not leading with '/'
      * @return response
      */
-    private String execute(String path) throws DorisEsException {
+    private String executeWithRequestBody(String path, RequestBody 
requestBody) throws DorisEsException {
         // try 3 times for every node
         int retrySize = nodes.length * 3;
         DorisEsException scratchExceptionForThrow = null;
@@ -277,7 +303,7 @@ public class EsRestClient {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("es rest client request URL: {}", currentNode + "/" 
+ path);
             }
-            try (Response response = executeResponse(httpClient, path)) {
+            try (Response response = executeResponse(httpClient, path, 
requestBody)) {
                 if (response.isSuccessful()) {
                     return response.body().string();
                 } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java
new file mode 100644
index 00000000000..1a075c79782
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.JsonUtil;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.es.EsExternalCatalog;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.rest.RestBaseController;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@RestController
+@RequestMapping("/rest/v2/api/es_catalog")
+public class ESCatalogAction extends RestBaseController {
+
+    private static final Logger LOG = 
LogManager.getLogger(ESCatalogAction.class);
+    private static final String CATALOG = "catalog";
+    private static final String TABLE = "table";
+
+    private Object handleRequest(HttpServletRequest request, 
HttpServletResponse response,
+            BiFunction<EsExternalCatalog, String, String> action) {
+        if (Config.enable_all_http_auth) {
+            executeCheckPassword(request, response);
+        }
+
+        try {
+            if (!Env.getCurrentEnv().isMaster()) {
+                return redirectToMasterOrException(request, response);
+            }
+        } catch (Exception e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
+
+        Map<String, Object> resultMap = Maps.newHashMap();
+        Env env = Env.getCurrentEnv();
+        String catalogName = request.getParameter(CATALOG);
+        String tableName = request.getParameter(TABLE);
+        CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName);
+        if (!(catalog instanceof EsExternalCatalog)) {
+            return ResponseEntityBuilder.badRequest("unknown ES Catalog: " + 
catalogName);
+        }
+        EsExternalCatalog esExternalCatalog = (EsExternalCatalog) catalog;
+        esExternalCatalog.makeSureInitialized();
+        String result = action.apply(esExternalCatalog, tableName);
+        ObjectNode jsonResult = JsonUtil.parseObject(result);
+
+        resultMap.put("catalog", catalogName);
+        resultMap.put("table", tableName);
+        resultMap.put("result", jsonResult);
+
+        return ResponseEntityBuilder.ok(resultMap);
+    }
+
+    @RequestMapping(path = "/get_mapping", method = RequestMethod.GET)
+    public Object getMapping(HttpServletRequest request, HttpServletResponse 
response) {
+        return handleRequest(request, response, (esExternalCatalog, tableName) 
->
+            esExternalCatalog.getEsRestClient().getMapping(tableName));
+    }
+
+    @RequestMapping(path = "/search", method = RequestMethod.POST)
+    public Object search(HttpServletRequest request, HttpServletResponse 
response) {
+        String body;
+        try {
+            body = getRequestBody(request);
+        } catch (IOException e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
+        return handleRequest(request, response, (esExternalCatalog, tableName) 
->
+            esExternalCatalog.getEsRestClient().searchIndex(tableName, body));
+    }
+
+    private String getRequestBody(HttpServletRequest request) throws 
IOException {
+        BufferedReader reader = request.getReader();
+        return 
reader.lines().collect(Collectors.joining(System.lineSeparator()));
+    }
+}
diff --git a/regression-test/plugins/plugin_curl_requester.groovy 
b/regression-test/plugins/plugin_curl_requester.groovy
index 15affb45364..62b7433f37b 100644
--- a/regression-test/plugins/plugin_curl_requester.groovy
+++ b/regression-test/plugins/plugin_curl_requester.groovy
@@ -111,14 +111,12 @@ Suite.metaClass.http_client = { String method, String url 
/* param */ ->
 
 logger.info("Added 'http_client' function to Suite")
 
-Suite.metaClass.curl = { String method, String url /* param */-> 
+Suite.metaClass.curl = { String method, String url, String body = null /* 
param */-> 
     Suite suite = delegate as Suite
-    if (method != "GET" && method != "POST")
-    {
+    if (method != "GET" && method != "POST") {
         throw new Exception(String.format("invalid curl method: %s", method))
     }
-    if (url.isBlank())
-    {
+    if (url.isBlank()) {
         throw new Exception("invalid curl url, blank")
     }
     
@@ -127,7 +125,13 @@ Suite.metaClass.curl = { String method, String url /* 
param */->
     Integer retryCount = 0; // Current retry count
     Integer sleepTime = 5000; // Sleep time in milliseconds
 
-    String cmd = String.format("curl --max-time %d -X %s %s", timeout, method, 
url).toString()
+    String cmd
+    if (method == "POST" && body != null) {
+        cmd = String.format("curl --max-time %d -X %s -H 
Content-Type:application/json -d %s %s", timeout, method, body, url).toString()
+    } else {
+        cmd = String.format("curl --max-time %d -X %s %s", timeout, method, 
url).toString()
+    }
+    
     logger.info("curl cmd: " + cmd)
     def process
     int code
diff --git 
a/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy
 
b/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy
new file mode 100644
index 00000000000..485cdd06186
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_es_catalog_http_open_api", 
"p0,external,es,external_docker,external_docker_es") {
+    String enabled = context.config.otherConfigs.get("enableEsTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String es_5_port = context.config.otherConfigs.get("es_5_port")
+        String es_6_port = context.config.otherConfigs.get("es_6_port")
+        String es_7_port = context.config.otherConfigs.get("es_7_port")
+        String es_8_port = context.config.otherConfigs.get("es_8_port")
+
+        // test old create-catalog syntax for compatibility
+        sql """
+            create catalog if not exists test_es_query_es5
+            properties (
+                "type"="es",
+                "elasticsearch.hosts"="http://${externalEnvIp}:$es_5_port";,
+                "elasticsearch.nodes_discovery"="false",
+                "elasticsearch.keyword_sniff"="true"
+            );
+        """
+        sql """
+            create catalog if not exists test_es_query_es6
+            properties (
+                "type"="es",
+                "elasticsearch.hosts"="http://${externalEnvIp}:$es_6_port";,
+                "elasticsearch.nodes_discovery"="false",
+                "elasticsearch.keyword_sniff"="true"
+            );
+        """
+
+        // test new create catalog syntax
+        sql """create catalog if not exists test_es_query_es7 properties(
+            "type"="es",
+            "hosts"="http://${externalEnvIp}:$es_7_port";,
+            "nodes_discovery"="false",
+            "enable_keyword_sniff"="true"
+        );
+        """
+
+        sql """create catalog if not exists test_es_query_es8 properties(
+            "type"="es",
+            "hosts"="http://${externalEnvIp}:$es_8_port";,
+            "nodes_discovery"="false",
+            "enable_keyword_sniff"="true"
+        );
+        """
+
+        List<String> feHosts = getFrontendIpHttpPort()
+        // for each catalog 5..8, send a request
+        for (int i = 5; i <= 8; i++) {
+            String catalog = String.format("test_es_query_es%s", i)
+            def (code, out, err) = curl("GET", 
String.format("http://%s/rest/v2/api/es_catalog/get_mapping?catalog=%s&table=test1";,
 feHosts[0], catalog))
+            logger.info("Get mapping response: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertTrue(code == 0)
+            assertTrue(out.toLowerCase().contains("success"))
+            assertTrue(out.toLowerCase().contains("mappings"))
+            assertTrue(out.toLowerCase().contains(catalog))
+
+            String body = 
'{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["test6"],"sort":["_doc"],"size":4064}';
+            def (code1, out1, err1) = curl("POST", 
String.format("http://%s/rest/v2/api/es_catalog/search?catalog=%s&table=test1";, 
feHosts[0], catalog), body)
+            logger.info("Search index response: code=" + code1 + ", out=" + 
out1 + ", err=" + err1)
+            assertTrue(code1 == 0)
+            assertTrue(out1.toLowerCase().contains("success"))
+            assertTrue(out1.toLowerCase().contains("hits"))
+            assertTrue(out1.toLowerCase().contains(catalog))
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to