This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 863a652011e [fix](ES Catalog)Make sure ES meta is synced before using 
(#46781)
863a652011e is described below

commit 863a652011eaf81c92cddc8f5e76a7103468cf90
Author: qiye <l...@selectdb.com>
AuthorDate: Mon Feb 10 17:48:28 2025 +0800

    [fix](ES Catalog)Make sure ES meta is synced before using (#46781)
    
    ### What problem does this PR solve?
    
    Issue Number: close #46780
    
    Problem Summary:
    
    1. Added calls to `EsResource.fillUrlsWithSchema` and
    `initEsMetaTracker` to ensure ES table URLs are correctly parsed and
    metadata tracker is initialized before use
    2. Introduced `executeWithRetry` function to handle retries for ES
    queries that may fail due to unsynchronized metadata.
    3. Remove useless ES tests and config.
---
 .../java/org/apache/doris/catalog/EsTable.java     | 18 +++++--
 regression-test/conf/regression-conf.groovy        |  7 ---
 .../external_table_p0/es/test_es_query.groovy      | 30 +++++++++++-
 .../es/test_es_query_no_http_url.groovy            | 31 ++++++++++--
 .../es/test_external_catalog_es.groovy             | 52 --------------------
 .../external_table_p2/es/test_external_es.groovy   | 56 ----------------------
 6 files changed, 72 insertions(+), 122 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index a3ed061ed48..a9ee65ef74b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -152,17 +152,26 @@ public class EsTable extends Table implements 
GsonPostProcessable {
     }
 
     public Map<String, String> fieldsContext() throws UserException {
+        initEsMetaStateTracker();
         return esMetaStateTracker.searchContext().fetchFieldsContext();
     }
 
     public Map<String, String> docValueContext() throws UserException {
+        initEsMetaStateTracker();
         return esMetaStateTracker.searchContext().docValueFieldsContext();
     }
 
     public List<String> needCompatDateFields() throws UserException {
+        initEsMetaStateTracker();
         return esMetaStateTracker.searchContext().needCompatDateFields();
     }
 
+    private void initEsMetaStateTracker() {
+        if (esMetaStateTracker == null) {
+            esMetaStateTracker = new EsMetaStateTracker(client, this);
+        }
+    }
+
     private void validate(Map<String, String> properties) throws DdlException {
         EsResource.valid(properties, false);
         if (properties.containsKey(EsResource.USER)) {
@@ -315,9 +324,12 @@ public class EsTable extends Table implements 
GsonPostProcessable {
         } else {
             throw new IOException("invalid partition type: " + partType);
         }
+        // parse httpSslEnabled before use it here.
+        EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
         client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
     }
 
+    @Override
     public void gsonPostProcess() throws IOException {
         hosts = tableContext.get("hosts");
         seeds = hosts.split(",");
@@ -346,6 +358,8 @@ public class EsTable extends Table implements 
GsonPostProcessable {
         includeHiddenIndex = 
Boolean.parseBoolean(tableContext.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX,
                 EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE));
 
+        // parse httpSslEnabled before use it here.
+        EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
         client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
     }
 
@@ -353,9 +367,7 @@ public class EsTable extends Table implements 
GsonPostProcessable {
      * Sync es index meta from remote ES Cluster.
      */
     public void syncTableMetaData() {
-        if (esMetaStateTracker == null) {
-            esMetaStateTracker = new EsMetaStateTracker(client, this);
-        }
+        initEsMetaStateTracker();
         try {
             esMetaStateTracker.run();
             this.esTablePartitions = 
esMetaStateTracker.searchContext().tablePartitions();
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index fd4259fbdd9..c30d04674f5 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -199,13 +199,6 @@ extPgPort = 5432
 extPgUser = "****"
 extPgPassword = "***********"
 
-// elasticsearch external test config for bigdata
-enableExternalEsTest = false
-extEsHost = "***********"
-extEsPort = 9200
-extEsUser = "*******"
-extEsPassword = "***********"
-
 // minio external test config
 enableExternalMinioTest = false
 extMinioHost = "***.**.**.**"
diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy 
b/regression-test/suites/external_table_p0/es/test_es_query.groovy
index 1645fa6af51..22789b9ebe1 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy
@@ -28,6 +28,8 @@ suite("test_es_query", 
"p0,external,es,external_docker,external_docker_es") {
         sql """drop catalog if exists test_es_query_es6;"""
         sql """drop catalog if exists test_es_query_es7;"""
         sql """drop catalog if exists test_es_query_es8;"""
+        sql """drop catalog if exists es6_hide;"""
+        sql """drop catalog if exists es7_hide;"""
         sql """drop table if exists test_v1;"""
         sql """drop table if exists test_v2;"""
 
@@ -166,9 +168,35 @@ suite("test_es_query", 
"p0,external,es,external_docker,external_docker_es") {
             );
         """
 
+        def executeWithRetry = { query, queryName, maxRetries ->
+            def retryCount = 0
+            def success = false
+
+            while (!success && retryCount < maxRetries) {
+                try {
+                    sql query
+                    success = true
+                } catch (Exception e) {
+                    if (e.getMessage().contains("EsTable metadata has not been 
synced, Try it later")) {
+                        logger.error("Failed to execute ${queryName}: 
${e.getMessage()}")
+                        logger.info("Retrying... Attempt ${retryCount + 1}")
+                        retryCount++
+                        sleep(1000) // Sleep for 1 second
+                    } else {
+                        throw e // Rethrow if it's a different exception
+                    }
+                }
+            }
+
+            if (!success) {
+                throw new RuntimeException("Failed to execute ${queryName} 
after ${maxRetries} attempts")
+            }
+        }
+
         def query_catalogs = { -> 
             sql """switch internal"""
             sql """use regression_test_external_table_p0_es"""
+            executeWithRetry("""select * from test_v1 where test2='text#1'""", 
"sql01", 30)
             order_qt_sql01 """select * from test_v1 where test2='text#1'"""
             order_qt_sql02 """select * from test_v1 where esquery(test2, 
'{"match":{"test2":"text#1"}}')"""
             order_qt_sql03 """select test4,test5,test6,test7,test8 from 
test_v1 order by test8"""
@@ -182,7 +210,7 @@ suite("test_es_query", 
"p0,external,es,external_docker,external_docker_es") {
             order_qt_sql11 """select test6 from test_v1;"""
             order_qt_sql12 """select test9 from test_v1;"""
             
-            order_qt_sql20 """select * from test_v2 where test2='text#1'"""
+            executeWithRetry("""select * from test_v2 where test2='text#1'""", 
"sql20", 30)
             order_qt_sql21 """select * from test_v2 where esquery(test2, 
'{"match":{"test2":"text#1"}}')"""
             order_qt_sql22 """select test4,test5,test6,test7,test8 from 
test_v2 order by test8"""
             order_qt_sql23 """select * from test_v2 where esquery(c_long, 
'{"term":{"c_long":"-1"}}');"""
diff --git 
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy 
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
index 004c1aea31e..f5219c1509e 100644
--- 
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
+++ 
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
@@ -23,6 +23,31 @@ suite("test_es_query_no_http_url", 
"p0,external,es,external_docker,external_dock
         String es_7_port = context.config.otherConfigs.get("es_7_port")
         String es_8_port = context.config.otherConfigs.get("es_8_port")
 
+        def executeWithRetry = { query, queryName, maxRetries ->
+            def retryCount = 0
+            def success = false
+
+            while (!success && retryCount < maxRetries) {
+                try {
+                    sql query
+                    success = true
+                } catch (Exception e) {
+                    if (e.getMessage().contains("EsTable metadata has not been 
synced, Try it later")) {
+                        logger.error("Failed to execute ${queryName}: 
${e.getMessage()}")
+                        logger.info("Retrying... Attempt ${retryCount + 1}")
+                        retryCount++
+                        sleep(1000) // Sleep for 1 second
+                    } else {
+                        throw e // Rethrow if it's a different exception
+                    }
+                }
+            }
+
+            if (!success) {
+                throw new RuntimeException("Failed to execute ${queryName} 
after ${maxRetries} attempts")
+            }
+        }
+
         sql """drop catalog if exists es6_no_http_url;"""
         sql """drop catalog if exists es7_no_http_url;"""
         sql """drop catalog if exists es8_no_http_url;"""
@@ -95,9 +120,9 @@ suite("test_es_query_no_http_url", 
"p0,external,es,external_docker,external_dock
                 "http_ssl_enabled"="false"
             );
         """
-        order_qt_sql51 """select * from test_v1_no_http_url where 
test2='text#1'"""
+        executeWithRetry("""select * from test_v1_no_http_url where 
test2='text#1'""", "sql51", 30)
 
-       sql """
+        sql """
             CREATE TABLE `test_v2_no_http_url` (
                 `c_datetime` array<datev2> NULL,
                 `c_long` array<bigint(20)> NULL,
@@ -133,7 +158,7 @@ suite("test_es_query_no_http_url", 
"p0,external,es,external_docker,external_dock
                 "http_ssl_enabled"="false"
             );
         """
-        order_qt_sql52 """select * from test_v2_no_http_url where 
test2='text#1'"""
+        executeWithRetry("""select * from test_v2_no_http_url where 
test2='text#1'""", "sql52", 30)
 
         // es6
         sql """switch es6_no_http_url"""
diff --git 
a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy 
b/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy
deleted file mode 100644
index 5412bc736c7..00000000000
--- 
a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//import org.postgresql.Driver
-suite("test_external_catalog_es", 
"p2,external,es,external_remote,external_remote_es") {
-    Boolean ignoreP2 = true;
-    if (ignoreP2) {
-        logger.info("disable p2 test");
-        return;
-    }
-
-    String enabled = context.config.otherConfigs.get("enableExternalEsTest")
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        String extEsHost = context.config.otherConfigs.get("extEsHost")
-        String extEsPort = context.config.otherConfigs.get("extEsPort")
-        String extEsUser = context.config.otherConfigs.get("extEsUser")
-        String extEsPassword = context.config.otherConfigs.get("extEsPassword")
-        String esCatalogName ="es7_catalog_name"
-
-        String jdbcPg14Table1 = "accounts"
-
-        sql """drop catalog if exists ${esCatalogName}"""
-
-        sql """
-            CREATE CATALOG ${esCatalogName} PROPERTIES (
-                    "type"="es",
-                    "elasticsearch.hosts"="http://${extEsHost}:${extEsPort}";,
-                    "elasticsearch.nodes_discovery"="false",
-                    "elasticsearch.username"="${extEsUser}",
-                    "elasticsearch.password"="${extEsPassword}"
-            );
-            """
-
-        qt_sql "select * from ${esCatalogName}.default_db.${jdbcPg14Table1} 
order by account_number limit 10;"
-
-        sql """drop catalog if exists ${esCatalogName};"""
-
-    }
-}
diff --git 
a/regression-test/suites/external_table_p2/es/test_external_es.groovy 
b/regression-test/suites/external_table_p2/es/test_external_es.groovy
deleted file mode 100644
index fcec9b7de3e..00000000000
--- a/regression-test/suites/external_table_p2/es/test_external_es.groovy
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//import org.postgresql.Driver
-suite("test_external_es", "p2,external,es,external_remote,external_remote_es") 
{
-
-    String enabled = context.config.otherConfigs.get("enableExternalEsTest")
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        String extEsHost = context.config.otherConfigs.get("extEsHost")
-        String extEsPort = context.config.otherConfigs.get("extEsPort")
-        String extEsUser = context.config.otherConfigs.get("extEsUser")
-        String extEsPassword = context.config.otherConfigs.get("extEsPassword")
-        String jdbcPg14Database1 = "jdbc_es_14_database1"
-        String jdbcPg14Table1 = "jdbc_es_14_table1"
-
-
-        sql """drop database if exists ${jdbcPg14Database1};"""
-        sql """create database ${jdbcPg14Database1};"""
-        sql """use ${jdbcPg14Database1};"""
-        sql """drop table if exists ${jdbcPg14Table1};"""
-
-        sql """
-            CREATE EXTERNAL TABLE `${jdbcPg14Table1}` (
-              `name` varchar(20) COMMENT "",
-              `age` varchar(20) COMMENT ""
-            ) ENGINE=ELASTICSEARCH
-            PROPERTIES (
-            "hosts" = "http://${extEsHost}:${extEsPort}";,
-            "index" = "helloworld",
-            "user" = "${extEsUser}",
-            "password" = "${extEsPassword}"
-            );
-            """
-        def res=sql """show create table ${jdbcPg14Table1};"""
-        logger.info("recoding desc res: "+ res.toString())
-
-        def res1=sql "select * from ${jdbcPg14Table1};"
-        logger.info("recoding all: " + res1.toString())
-
-        sql """drop table if exists ${jdbcPg14Table1};"""
-        sql """drop database if exists ${jdbcPg14Database1};"""
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to