This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 863a652011e [fix](ES Catalog)Make sure ES meta is synced before using (#46781) 863a652011e is described below commit 863a652011eaf81c92cddc8f5e76a7103468cf90 Author: qiye <l...@selectdb.com> AuthorDate: Mon Feb 10 17:48:28 2025 +0800 [fix](ES Catalog)Make sure ES meta is synced before using (#46781) ### What problem does this PR solve? Issue Number: close #46780 Problem Summary: 1. Added calls to `EsResource.fillUrlsWithSchema` and `initEsMetaTracker` to ensure ES table URLs are correctly parsed and metadata tracker is initialized before use 2. Introduced `executeWithRetry` function to handle retries for ES queries that may fail due to unsynchronized metadata. 3. Remove useless ES tests and config. --- .../java/org/apache/doris/catalog/EsTable.java | 18 +++++-- regression-test/conf/regression-conf.groovy | 7 --- .../external_table_p0/es/test_es_query.groovy | 30 +++++++++++- .../es/test_es_query_no_http_url.groovy | 31 ++++++++++-- .../es/test_external_catalog_es.groovy | 52 -------------------- .../external_table_p2/es/test_external_es.groovy | 56 ---------------------- 6 files changed, 72 insertions(+), 122 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index a3ed061ed48..a9ee65ef74b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -152,17 +152,26 @@ public class EsTable extends Table implements GsonPostProcessable { } public Map<String, String> fieldsContext() throws UserException { + initEsMetaStateTracker(); return esMetaStateTracker.searchContext().fetchFieldsContext(); } public Map<String, String> docValueContext() throws UserException { + initEsMetaStateTracker(); return esMetaStateTracker.searchContext().docValueFieldsContext(); } public List<String> needCompatDateFields() throws UserException { + initEsMetaStateTracker(); return esMetaStateTracker.searchContext().needCompatDateFields(); } + private void initEsMetaStateTracker() { + if (esMetaStateTracker == null) { + esMetaStateTracker = new EsMetaStateTracker(client, this); + } + } + private void validate(Map<String, String> properties) throws DdlException { EsResource.valid(properties, false); if (properties.containsKey(EsResource.USER)) { @@ -315,9 +324,12 @@ public class EsTable extends Table implements GsonPostProcessable { } else { throw new IOException("invalid partition type: " + partType); } + // parse httpSslEnabled before use it here. + EsResource.fillUrlsWithSchema(seeds, httpSslEnabled); client = new EsRestClient(seeds, userName, passwd, httpSslEnabled); } + @Override public void gsonPostProcess() throws IOException { hosts = tableContext.get("hosts"); seeds = hosts.split(","); @@ -346,6 +358,8 @@ public class EsTable extends Table implements GsonPostProcessable { includeHiddenIndex = Boolean.parseBoolean(tableContext.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX, EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE)); + // parse httpSslEnabled before use it here. + EsResource.fillUrlsWithSchema(seeds, httpSslEnabled); client = new EsRestClient(seeds, userName, passwd, httpSslEnabled); } @@ -353,9 +367,7 @@ public class EsTable extends Table implements GsonPostProcessable { * Sync es index meta from remote ES Cluster. */ public void syncTableMetaData() { - if (esMetaStateTracker == null) { - esMetaStateTracker = new EsMetaStateTracker(client, this); - } + initEsMetaStateTracker(); try { esMetaStateTracker.run(); this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions(); diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index fd4259fbdd9..c30d04674f5 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -199,13 +199,6 @@ extPgPort = 5432 extPgUser = "****" extPgPassword = "***********" -// elasticsearch external test config for bigdata -enableExternalEsTest = false -extEsHost = "***********" -extEsPort = 9200 -extEsUser = "*******" -extEsPassword = "***********" - // minio external test config enableExternalMinioTest = false extMinioHost = "***.**.**.**" diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy b/regression-test/suites/external_table_p0/es/test_es_query.groovy index 1645fa6af51..22789b9ebe1 100644 --- a/regression-test/suites/external_table_p0/es/test_es_query.groovy +++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy @@ -28,6 +28,8 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") { sql """drop catalog if exists test_es_query_es6;""" sql """drop catalog if exists test_es_query_es7;""" sql """drop catalog if exists test_es_query_es8;""" + sql """drop catalog if exists es6_hide;""" + sql """drop catalog if exists es7_hide;""" sql """drop table if exists test_v1;""" sql """drop table if exists test_v2;""" @@ -166,9 +168,35 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") { ); """ + def executeWithRetry = { query, queryName, maxRetries -> + def retryCount = 0 + def success = false + + while (!success && retryCount < maxRetries) { + try { + sql query + success = true + } catch (Exception e) { + if (e.getMessage().contains("EsTable metadata has not been synced, Try it later")) { + logger.error("Failed to execute ${queryName}: ${e.getMessage()}") + logger.info("Retrying... Attempt ${retryCount + 1}") + retryCount++ + sleep(1000) // Sleep for 1 second + } else { + throw e // Rethrow if it's a different exception + } + } + } + + if (!success) { + throw new RuntimeException("Failed to execute ${queryName} after ${maxRetries} attempts") + } + } + def query_catalogs = { -> sql """switch internal""" sql """use regression_test_external_table_p0_es""" + executeWithRetry("""select * from test_v1 where test2='text#1'""", "sql01", 30) order_qt_sql01 """select * from test_v1 where test2='text#1'""" order_qt_sql02 """select * from test_v1 where esquery(test2, '{"match":{"test2":"text#1"}}')""" order_qt_sql03 """select test4,test5,test6,test7,test8 from test_v1 order by test8""" @@ -182,7 +210,7 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") { order_qt_sql11 """select test6 from test_v1;""" order_qt_sql12 """select test9 from test_v1;""" - order_qt_sql20 """select * from test_v2 where test2='text#1'""" + executeWithRetry("""select * from test_v2 where test2='text#1'""", "sql20", 30) order_qt_sql21 """select * from test_v2 where esquery(test2, '{"match":{"test2":"text#1"}}')""" order_qt_sql22 """select test4,test5,test6,test7,test8 from test_v2 order by test8""" order_qt_sql23 """select * from test_v2 where esquery(c_long, '{"term":{"c_long":"-1"}}');""" diff --git a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy index 004c1aea31e..f5219c1509e 100644 --- a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy +++ b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy @@ -23,6 +23,31 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock String es_7_port = context.config.otherConfigs.get("es_7_port") String es_8_port = context.config.otherConfigs.get("es_8_port") + def executeWithRetry = { query, queryName, maxRetries -> + def retryCount = 0 + def success = false + + while (!success && retryCount < maxRetries) { + try { + sql query + success = true + } catch (Exception e) { + if (e.getMessage().contains("EsTable metadata has not been synced, Try it later")) { + logger.error("Failed to execute ${queryName}: ${e.getMessage()}") + logger.info("Retrying... Attempt ${retryCount + 1}") + retryCount++ + sleep(1000) // Sleep for 1 second + } else { + throw e // Rethrow if it's a different exception + } + } + } + + if (!success) { + throw new RuntimeException("Failed to execute ${queryName} after ${maxRetries} attempts") + } + } + sql """drop catalog if exists es6_no_http_url;""" sql """drop catalog if exists es7_no_http_url;""" sql """drop catalog if exists es8_no_http_url;""" @@ -95,9 +120,9 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock "http_ssl_enabled"="false" ); """ - order_qt_sql51 """select * from test_v1_no_http_url where test2='text#1'""" + executeWithRetry("""select * from test_v1_no_http_url where test2='text#1'""", "sql51", 30) - sql """ + sql """ CREATE TABLE `test_v2_no_http_url` ( `c_datetime` array<datev2> NULL, `c_long` array<bigint(20)> NULL, @@ -133,7 +158,7 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock "http_ssl_enabled"="false" ); """ - order_qt_sql52 """select * from test_v2_no_http_url where test2='text#1'""" + executeWithRetry("""select * from test_v2_no_http_url where test2='text#1'""", "sql52", 30) // es6 sql """switch es6_no_http_url""" diff --git a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy b/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy deleted file mode 100644 index 5412bc736c7..00000000000 --- a/regression-test/suites/external_table_p2/es/test_external_catalog_es.groovy +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -//import org.postgresql.Driver -suite("test_external_catalog_es", "p2,external,es,external_remote,external_remote_es") { - Boolean ignoreP2 = true; - if (ignoreP2) { - logger.info("disable p2 test"); - return; - } - - String enabled = context.config.otherConfigs.get("enableExternalEsTest") - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String extEsHost = context.config.otherConfigs.get("extEsHost") - String extEsPort = context.config.otherConfigs.get("extEsPort") - String extEsUser = context.config.otherConfigs.get("extEsUser") - String extEsPassword = context.config.otherConfigs.get("extEsPassword") - String esCatalogName ="es7_catalog_name" - - String jdbcPg14Table1 = "accounts" - - sql """drop catalog if exists ${esCatalogName}""" - - sql """ - CREATE CATALOG ${esCatalogName} PROPERTIES ( - "type"="es", - "elasticsearch.hosts"="http://${extEsHost}:${extEsPort}", - "elasticsearch.nodes_discovery"="false", - "elasticsearch.username"="${extEsUser}", - "elasticsearch.password"="${extEsPassword}" - ); - """ - - qt_sql "select * from ${esCatalogName}.default_db.${jdbcPg14Table1} order by account_number limit 10;" - - sql """drop catalog if exists ${esCatalogName};""" - - } -} diff --git a/regression-test/suites/external_table_p2/es/test_external_es.groovy b/regression-test/suites/external_table_p2/es/test_external_es.groovy deleted file mode 100644 index fcec9b7de3e..00000000000 --- a/regression-test/suites/external_table_p2/es/test_external_es.groovy +++ /dev/null @@ -1,56 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -//import org.postgresql.Driver -suite("test_external_es", "p2,external,es,external_remote,external_remote_es") { - - String enabled = context.config.otherConfigs.get("enableExternalEsTest") - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String extEsHost = context.config.otherConfigs.get("extEsHost") - String extEsPort = context.config.otherConfigs.get("extEsPort") - String extEsUser = context.config.otherConfigs.get("extEsUser") - String extEsPassword = context.config.otherConfigs.get("extEsPassword") - String jdbcPg14Database1 = "jdbc_es_14_database1" - String jdbcPg14Table1 = "jdbc_es_14_table1" - - - sql """drop database if exists ${jdbcPg14Database1};""" - sql """create database ${jdbcPg14Database1};""" - sql """use ${jdbcPg14Database1};""" - sql """drop table if exists ${jdbcPg14Table1};""" - - sql """ - CREATE EXTERNAL TABLE `${jdbcPg14Table1}` ( - `name` varchar(20) COMMENT "", - `age` varchar(20) COMMENT "" - ) ENGINE=ELASTICSEARCH - PROPERTIES ( - "hosts" = "http://${extEsHost}:${extEsPort}", - "index" = "helloworld", - "user" = "${extEsUser}", - "password" = "${extEsPassword}" - ); - """ - def res=sql """show create table ${jdbcPg14Table1};""" - logger.info("recoding desc res: "+ res.toString()) - - def res1=sql "select * from ${jdbcPg14Table1};" - logger.info("recoding all: " + res1.toString()) - - sql """drop table if exists ${jdbcPg14Table1};""" - sql """drop database if exists ${jdbcPg14Database1};""" - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org