This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch revert-41330-jdbc_lower_mapping21
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4e7d45c371f8a0fd1f3999bb51dab2f953b8de2a
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Wed Oct 9 13:58:24 2024 +0800

    Revert "[branch-2.1][improvement](jdbc catalog) Optimize JdbcCatalog case 
map…"
    
    This reverts commit 3eda77b3d9f1a6eba6977b581122e05142590a8f.
---
 .../apache/doris/datasource/ExternalCatalog.java   |  18 --
 .../apache/doris/datasource/ExternalDatabase.java  |   7 -
 .../doris/datasource/jdbc/JdbcExternalCatalog.java |  56 +---
 .../doris/datasource/jdbc/JdbcExternalTable.java   |  25 +-
 .../datasource/jdbc/JdbcIdentifierMapping.java     |  45 +++
 .../doris/datasource/jdbc/client/JdbcClient.java   |  46 ++-
 .../datasource/jdbc/client/JdbcMySQLClient.java    |   4 +-
 .../datasource/jdbc/client/JdbcOracleClient.java   |   4 +-
 .../mapping/DefaultIdentifierMapping.java          | 268 ------------------
 .../datasource/mapping/IdentifierMapping.java      | 307 ++++++++++++++++++++-
 10 files changed, 409 insertions(+), 371 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 1b3cd9c33a1..f6e5a570cc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -45,7 +45,6 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
 import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
 import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
 import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
-import org.apache.doris.datasource.mapping.IdentifierMapping;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
 import org.apache.doris.datasource.metacache.MetaCache;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
@@ -143,9 +142,6 @@ public abstract class ExternalCatalog
     protected Optional<Boolean> useMetaCache = Optional.empty();
     protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
 
-    protected IdentifierMapping identifierMapping;
-    private boolean mappingsInitialized = false;
-
     public ExternalCatalog() {
     }
 
@@ -178,10 +174,6 @@ public abstract class ExternalCatalog
         }
     }
 
-    // only for forward to master
-    protected void buildDatabaseMapping() {
-    }
-
     // Will be called when creating catalog(so when as replaying)
     // to add some default properties if missing.
     public void setDefaultPropsIfMissing(boolean isReplay) {
@@ -210,10 +202,6 @@ public abstract class ExternalCatalog
      */
     public abstract List<String> listTableNames(SessionContext ctx, String 
dbName);
 
-    // only for forward to master
-    protected void buildTableMapping(SessionContext ctx, String dbName) {
-    }
-
     /**
      * check if the specified table exist.
      *
@@ -278,10 +266,6 @@ public abstract class ExternalCatalog
             }
             initialized = true;
         }
-        if (!mappingsInitialized) {
-            buildDatabaseMapping();
-            mappingsInitialized = true;
-        }
     }
 
     protected final void initLocalObjects() {
@@ -407,7 +391,6 @@ public abstract class ExternalCatalog
     public void onRefresh(boolean invalidCache) {
         this.objectCreated = false;
         this.initialized = false;
-        this.mappingsInitialized = false;
         synchronized (this.propLock) {
             this.convertedProperties = null;
         }
@@ -733,7 +716,6 @@ public abstract class ExternalCatalog
         }
         this.propLock = new byte[0];
         this.initialized = false;
-        this.mappingsInitialized = false;
         setDefaultPropsIfMissing(true);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index cf65a5f0a48..d653a5a178e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -91,8 +91,6 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
 
     private MetaCache<T> metaCache;
 
-    private boolean mappingsInitialized = false;
-
     /**
      * Create external database.
      *
@@ -119,7 +117,6 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
 
     public void setUnInitialized(boolean invalidCache) {
         this.initialized = false;
-        this.mappingsInitialized = false;
         this.invalidCacheInInit = invalidCache;
         if (extCatalog.getUseMetaCache().isPresent()) {
             if (extCatalog.getUseMetaCache().get() && metaCache != null) {
@@ -173,10 +170,6 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
             }
             initialized = true;
         }
-        if (!mappingsInitialized) {
-            extCatalog.buildTableMapping(null, name);
-            mappingsInitialized = true;
-        }
     }
 
     public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index d25f97e70bd..80cc0f554f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -31,7 +31,6 @@ import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.jdbc.client.JdbcClientException;
-import org.apache.doris.datasource.mapping.DefaultIdentifierMapping;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
 import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
@@ -119,16 +118,19 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         super.onRefresh(invalidCache);
         if (jdbcClient != null) {
             jdbcClient.closeClient();
-            jdbcClient = null;
         }
     }
 
+    @Override
+    public void onRefreshCache(boolean invalidCache) {
+        onRefresh(invalidCache);
+    }
+
     @Override
     public void onClose() {
         super.onClose();
         if (jdbcClient != null) {
             jdbcClient.closeClient();
-            jdbcClient = null;
         }
     }
 
@@ -229,6 +231,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 .setDriverUrl(getDriverUrl())
                 .setDriverClass(getDriverClass())
                 .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
+                .setIsLowerCaseMetaNames(getLowerCaseMetaNames())
+                .setMetaNamesMapping(getMetaNamesMapping())
                 .setIncludeDatabaseMap(getIncludeDatabaseMap())
                 .setExcludeDatabaseMap(getExcludeDatabaseMap())
                 .setConnectionPoolMinSize(getConnectionPoolMinSize())
@@ -238,62 +242,22 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 .setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
 
         jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
-        identifierMapping = new 
DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()),
-                getMetaNamesMapping());
     }
 
-    @Override
     protected List<String> listDatabaseNames() {
-        return 
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
-    }
-
-    @Override
-    protected void buildDatabaseMapping() {
-        
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
-    }
-
-    protected String getRemoteDatabaseName(String dbName) {
-        return identifierMapping.toRemoteDatabaseName(dbName);
+        return jdbcClient.getDatabaseNameList();
     }
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
-        String remoteDbName = getRemoteDatabaseName(dbName);
-        return identifierMapping.fromRemoteTableName(remoteDbName, 
jdbcClient.getTablesNameList(remoteDbName));
-    }
-
-    @Override
-    protected void buildTableMapping(SessionContext ctx, String dbName) {
-        String remoteDbName = getRemoteDatabaseName(dbName);
-        identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName),
-                jdbcClient.getTablesNameList(remoteDbName));
-    }
-
-    protected String getRemoteTableName(String dbName, String tblName) {
-        return 
identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName);
+        return jdbcClient.getTablesNameList(dbName);
     }
 
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
-        String remoteDbName = getRemoteDatabaseName(dbName);
-        String remoteTblName = getRemoteTableName(dbName, tblName);
-        return jdbcClient.isTableExist(remoteDbName, remoteTblName);
-    }
-
-    public List<Column> listColumns(String dbName, String tblName) {
-        makeSureInitialized();
-        String remoteDbName = getRemoteDatabaseName(dbName);
-        String remoteTblName = getRemoteTableName(dbName, tblName);
-        return identifierMapping.fromRemoteColumnName(remoteDbName, 
remoteTblName,
-                jdbcClient.getColumnsFromJdbc(remoteDbName,
-                        remoteTblName));
-    }
-
-    protected Map<String, String> getRemoteColumnNames(String dbName, String 
tblName) {
-        return 
identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName),
-                getRemoteTableName(dbName, tblName));
+        return jdbcClient.isTableExist(dbName, tblName);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index a3af7f5b820..07ce183a589 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -32,7 +32,6 @@ import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.thrift.TTableDescriptor;
 
-import com.google.common.collect.Maps;
 import org.apache.commons.text.StringSubstitutor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -87,29 +86,21 @@ public class JdbcExternalTable extends ExternalTable {
 
     @Override
     public Optional<SchemaCacheValue> initSchema() {
-        return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) 
catalog).listColumns(dbName, name)));
+        return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) 
catalog).getJdbcClient()
+                .getColumnsFromJdbc(dbName, name)));
     }
 
     private JdbcTable toJdbcTable() {
         List<Column> schema = getFullSchema();
         JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
-        String fullTableName = this.dbName + "." + this.name;
-        JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, 
TableType.JDBC_EXTERNAL_TABLE);
-        jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName);
+        String fullDbName = this.dbName + "." + this.name;
+        JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, 
TableType.JDBC_EXTERNAL_TABLE);
+        jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
 
         // Set remote properties
-        
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName));
-        
jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, 
this.name));
-        Map<String, String> remoteColumnNames = 
jdbcCatalog.getRemoteColumnNames(this.dbName, this.name);
-        if (!remoteColumnNames.isEmpty()) {
-            jdbcTable.setRemoteColumnNames(remoteColumnNames);
-        } else {
-            remoteColumnNames = Maps.newHashMap();
-            for (Column column : schema) {
-                remoteColumnNames.put(column.getName(), column.getName());
-            }
-            jdbcTable.setRemoteColumnNames(remoteColumnNames);
-        }
+        
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
+        
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName,
 this.name));
+        
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName,
 this.name));
 
         return jdbcTable;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
new file mode 100644
index 00000000000..20a74724b3e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.jdbc;
+
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.datasource.mapping.IdentifierMapping;
+
+public class JdbcIdentifierMapping extends IdentifierMapping {
+    private final JdbcClient jdbcClient;
+
+    public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String 
metaNamesMapping, JdbcClient jdbcClient) {
+        super(isLowerCaseMetaNames, metaNamesMapping);
+        this.jdbcClient = jdbcClient;
+    }
+
+    @Override
+    protected void loadDatabaseNames() {
+        jdbcClient.getDatabaseNameList();
+    }
+
+    @Override
+    protected void loadTableNames(String localDbName) {
+        jdbcClient.getTablesNameList(localDbName);
+    }
+
+    @Override
+    protected void loadColumnNames(String localDbName, String localTableName) {
+        jdbcClient.getColumnsFromJdbc(localDbName, localTableName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 2b82c074809..0e57f989df3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
 import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;
 
 import com.google.common.collect.ImmutableSet;
@@ -61,8 +62,11 @@ public abstract class JdbcClient {
     protected ClassLoader classLoader = null;
     protected HikariDataSource dataSource = null;
     protected boolean isOnlySpecifiedDatabase;
+    protected boolean isLowerCaseMetaNames;
+    protected String metaNamesMapping;
     protected Map<String, Boolean> includeDatabaseMap;
     protected Map<String, Boolean> excludeDatabaseMap;
+    protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;
 
     public static JdbcClient createJdbcClient(JdbcClientConfig 
jdbcClientConfig) {
         String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@@ -97,6 +101,8 @@ public abstract class JdbcClient {
         this.catalogName = jdbcClientConfig.getCatalog();
         this.jdbcUser = jdbcClientConfig.getUser();
         this.isOnlySpecifiedDatabase = 
Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
+        this.isLowerCaseMetaNames = 
Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
+        this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
         this.includeDatabaseMap =
                 
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
         this.excludeDatabaseMap =
@@ -105,6 +111,7 @@ public abstract class JdbcClient {
         this.dbType = parseDbType(jdbcUrl);
         initializeClassLoader(jdbcClientConfig);
         initializeDataSource(jdbcClientConfig);
+        this.jdbcLowerCaseMetaMatching = new 
JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
     }
 
     // Initialize DataSource
@@ -287,9 +294,10 @@ public abstract class JdbcClient {
     /**
      * get all tables of one database
      */
-    public List<String> getTablesNameList(String remoteDbName) {
+    public List<String> getTablesNameList(String localDbName) {
         List<String> remoteTablesNames = Lists.newArrayList();
         String[] tableTypes = getTableTypes();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
         processTable(remoteDbName, null, tableTypes, (rs) -> {
             try {
                 while (rs.next()) {
@@ -299,12 +307,14 @@ public abstract class JdbcClient {
                 throw new JdbcClientException("failed to get all tables for 
remote database: `%s`", e, remoteDbName);
             }
         });
-        return remoteTablesNames;
+        return filterTableNames(remoteDbName, remoteTablesNames);
     }
 
-    public boolean isTableExist(String remoteDbName, String remoteTableName) {
+    public boolean isTableExist(String localDbName, String localTableName) {
         final boolean[] isExist = {false};
         String[] tableTypes = getTableTypes();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
             try {
                 if (rs.next()) {
@@ -321,10 +331,12 @@ public abstract class JdbcClient {
     /**
      * get all columns of one table
      */
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, 
String remoteTableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
         List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
@@ -350,7 +362,21 @@ public abstract class JdbcClient {
                     field.isAllowNull(), field.getRemarks(),
                     true, -1));
         }
-        return dorisTableSchema;
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
+        return filterColumnName(remoteDbName, remoteTableName, 
dorisTableSchema);
+    }
+
+    public String getRemoteDatabaseName(String localDbname) {
+        return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
+    }
+
+    public String getRemoteTableName(String localDbName, String 
localTableName) {
+        return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, 
localTableName);
+    }
+
+    public Map<String, String> getRemoteColumnNames(String localDbName, String 
localTableName) {
+        return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, 
localTableName);
     }
 
     // protected methods,for subclass to override
@@ -408,7 +434,7 @@ public abstract class JdbcClient {
             }
             filteredDatabaseNames.add(databaseName);
         }
-        return filteredDatabaseNames;
+        return 
jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
     }
 
     protected Set<String> getFilterInternalDatabases() {
@@ -419,6 +445,14 @@ public abstract class JdbcClient {
                 .build();
     }
 
+    protected List<String> filterTableNames(String remoteDbName, List<String> 
remoteTableNames) {
+        return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, 
remoteTableNames);
+    }
+
+    protected List<Column> filterColumnName(String remoteDbName, String 
remoteTableName, List<Column> remoteColumns) {
+        return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, 
remoteTableName, remoteColumns);
+    }
+
     protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema);
 
     protected Type createDecimalOrStringType(int precision, int scale) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
index 3baa2ce9d91..5624392de14 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
@@ -129,10 +129,12 @@ public class JdbcMySQLClient extends JdbcClient {
      * get all columns of one table
      */
     @Override
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, 
String remoteTableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
         List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
index dc367e8ea6e..d37b36cbf3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
@@ -49,10 +49,12 @@ public class JdbcOracleClient extends JdbcClient {
     }
 
     @Override
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, 
String remoteTableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
         List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
deleted file mode 100644
index 4847cd86e6d..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/DefaultIdentifierMapping.java
+++ /dev/null
@@ -1,268 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.mapping;
-
-import org.apache.doris.catalog.Column;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DefaultIdentifierMapping implements IdentifierMapping {
-    private static final Logger LOG = 
LogManager.getLogger(DefaultIdentifierMapping.class);
-
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final ConcurrentHashMap<String, String> localDBToRemoteDB = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
localTableToRemoteTable
-            = new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<String, String>>>
-            localColumnToRemoteColumn = new ConcurrentHashMap<>();
-
-    private final boolean isLowerCaseMetaNames;
-    private final String metaNamesMapping;
-
-    public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String 
metaNamesMapping) {
-        this.isLowerCaseMetaNames = isLowerCaseMetaNames;
-        this.metaNamesMapping = metaNamesMapping;
-    }
-
-    private boolean isMappingInvalid() {
-        return metaNamesMapping == null || metaNamesMapping.isEmpty();
-    }
-
-    @Override
-    public List<String> fromRemoteDatabaseName(List<String> 
remoteDatabaseNames) {
-        // If mapping is not required, return the original input
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return remoteDatabaseNames;
-        }
-        JsonNode databasesNode = readAndParseJson(metaNamesMapping, 
"databases");
-
-        Map<String, String> databaseNameMapping = Maps.newTreeMap();
-        if (databasesNode.isArray()) {
-            for (JsonNode node : databasesNode) {
-                String remoteDatabase = node.path("remoteDatabase").asText();
-                String mapping = node.path("mapping").asText();
-                databaseNameMapping.put(remoteDatabase, mapping);
-            }
-        }
-
-        Map<String, List<String>> result = 
nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
-                databaseNameMapping, isLowerCaseMetaNames);
-        List<String> localDatabaseNames = result.get("localNames");
-        List<String> conflictNames = result.get("conflictNames");
-        if (!conflictNames.isEmpty()) {
-            throw new RuntimeException(
-                    "Conflict database/schema names found when 
lower_case_meta_names is true: " + conflictNames
-                            + ". Please set lower_case_meta_names to false or"
-                            + " use meta_name_mapping to specify the names.");
-        }
-        return localDatabaseNames;
-    }
-
-    @Override
-    public List<String> fromRemoteTableName(String remoteDbName, List<String> 
remoteTableNames) {
-        // If mapping is not required, return the original input
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return remoteTableNames;
-        }
-        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
-
-        Map<String, String> tableNameMapping = Maps.newTreeMap();
-        if (tablesNode.isArray()) {
-            for (JsonNode node : tablesNode) {
-                String remoteDatabase = node.path("remoteDatabase").asText();
-                if (remoteDbName.equals(remoteDatabase)) {
-                    String remoteTable = node.path("remoteTable").asText();
-                    String mapping = node.path("mapping").asText();
-                    tableNameMapping.put(remoteTable, mapping);
-                }
-            }
-        }
-
-        localTableToRemoteTable.putIfAbsent(remoteDbName, new 
ConcurrentHashMap<>());
-
-        Map<String, List<String>> result = nameListToMapping(remoteTableNames,
-                localTableToRemoteTable.get(remoteDbName),
-                tableNameMapping, isLowerCaseMetaNames);
-        List<String> localTableNames = result.get("localNames");
-        List<String> conflictNames = result.get("conflictNames");
-
-        if (!conflictNames.isEmpty()) {
-            throw new RuntimeException(
-                    "Conflict table names found in remote database/schema: " + 
remoteDbName
-                            + " when lower_case_meta_names is true: " + 
conflictNames
-                            + ". Please set lower_case_meta_names to false or"
-                            + " use meta_name_mapping to specify the table 
names.");
-        }
-        return localTableNames;
-    }
-
-    @Override
-    public List<Column> fromRemoteColumnName(String remoteDatabaseName, String 
remoteTableName,
-            List<Column> remoteColumns) {
-        // If mapping is not required, return the original input
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return remoteColumns;
-        }
-        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
-
-        Map<String, String> columnNameMapping = Maps.newTreeMap();
-        if (tablesNode.isArray()) {
-            for (JsonNode node : tablesNode) {
-                String remoteDatabase = node.path("remoteDatabase").asText();
-                String remoteTable = node.path("remoteTable").asText();
-                if (remoteDatabaseName.equals(remoteDatabase) && 
remoteTable.equals(remoteTableName)) {
-                    String remoteColumn = node.path("remoteColumn").asText();
-                    String mapping = node.path("mapping").asText();
-                    columnNameMapping.put(remoteColumn, mapping);
-                }
-            }
-        }
-        localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new 
ConcurrentHashMap<>());
-        
localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, 
new ConcurrentHashMap<>());
-
-        List<String> remoteColumnNames = Lists.newArrayList();
-        for (Column remoteColumn : remoteColumns) {
-            remoteColumnNames.add(remoteColumn.getName());
-        }
-
-        Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
-                
localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName),
-                columnNameMapping, isLowerCaseMetaNames);
-        List<String> localColumnNames = result.get("localNames");
-        List<String> conflictNames = result.get("conflictNames");
-        if (!conflictNames.isEmpty()) {
-            throw new RuntimeException(
-                    "Conflict column names found in remote database/schema: " 
+ remoteDatabaseName
-                            + " in remote table: " + remoteTableName
-                            + " when lower_case_meta_names is true: " + 
conflictNames
-                            + ". Please set lower_case_meta_names to false or"
-                            + " use meta_name_mapping to specify the column 
names.");
-        }
-        for (int i = 0; i < remoteColumns.size(); i++) {
-            remoteColumns.get(i).setName(localColumnNames.get(i));
-        }
-        return remoteColumns;
-    }
-
-    @Override
-    public String toRemoteDatabaseName(String localDatabaseName) {
-        // If mapping is not required, return the original input
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return localDatabaseName;
-        }
-        return getRequiredMapping(localDBToRemoteDB, localDatabaseName, 
"database", localDatabaseName);
-    }
-
-    @Override
-    public String toRemoteTableName(String remoteDatabaseName, String 
localTableName) {
-        // If mapping is not required, return the original input
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return localTableName;
-        }
-        Map<String, String> tableMap = 
localTableToRemoteTable.computeIfAbsent(remoteDatabaseName,
-                k -> new ConcurrentHashMap<>());
-        return getRequiredMapping(tableMap, localTableName, "table", 
localTableName);
-    }
-
-    @Override
-    public Map<String, String> toRemoteColumnNames(String remoteDatabaseName, 
String remoteTableName) {
-        // If mapping is not required, return an empty map (since there's no 
mapping)
-        if (!isLowerCaseMetaNames && isMappingInvalid()) {
-            return Collections.emptyMap();
-        }
-        ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
tableColumnMap
-                = 
localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new 
ConcurrentHashMap<>());
-        Map<String, String> columnMap = 
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
-        if (columnMap.isEmpty()) {
-            LOG.warn("No remote column found for: {}. Please refresh this 
catalog.", remoteTableName);
-            throw new RuntimeException(
-                    "No remote column found for: " + remoteTableName + ". 
Please refresh this catalog.");
-        }
-        return columnMap;
-    }
-
-    private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, 
String entityName) {
-        V value = map.get(key);
-        if (value == null) {
-            LOG.warn("No remote {} found for {}: {}. Please refresh this 
catalog.", typeName, typeName, entityName);
-            throw new RuntimeException("No remote " + typeName + " found for " 
+ typeName + ": " + entityName
-                    + ". Please refresh this catalog.");
-        }
-        return value;
-    }
-
-    private JsonNode readAndParseJson(String jsonPath, String nodeName) {
-        JsonNode rootNode;
-        try {
-            rootNode = mapper.readTree(jsonPath);
-            return rootNode.path(nodeName);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException("parse meta_names_mapping property 
error", e);
-        }
-    }
-
-    private Map<String, List<String>> nameListToMapping(List<String> 
remoteNames,
-            ConcurrentHashMap<String, String> localNameToRemoteName,
-            Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
-        List<String> filteredDatabaseNames = Lists.newArrayList();
-        Set<String> lowerCaseNames = Sets.newHashSet();
-        Map<String, List<String>> nameMap = Maps.newHashMap();
-        List<String> conflictNames = Lists.newArrayList();
-
-        for (String name : remoteNames) {
-            String mappedName = nameMapping.getOrDefault(name, name);
-            String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() 
: mappedName;
-
-            localNameToRemoteName.computeIfAbsent(localName, k -> name);
-
-            if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
-                if (nameMap.containsKey(localName)) {
-                    nameMap.get(localName).add(mappedName);
-                }
-            } else {
-                nameMap.putIfAbsent(localName, 
Lists.newArrayList(Collections.singletonList(mappedName)));
-            }
-
-            filteredDatabaseNames.add(localName);
-        }
-
-        for (List<String> conflictNameList : nameMap.values()) {
-            if (conflictNameList.size() > 1) {
-                conflictNames.addAll(conflictNameList);
-            }
-        }
-
-        Map<String, List<String>> result = Maps.newConcurrentMap();
-        result.put("localNames", filteredDatabaseNames);
-        result.put("conflictNames", conflictNames);
-        return result;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
index 7745a25d27d..363ef351152 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
@@ -18,20 +18,313 @@
 package org.apache.doris.datasource.mapping;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.qe.GlobalVariable;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class IdentifierMapping {
+    private static final Logger LOG = 
LogManager.getLogger(IdentifierMapping.class);
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final ConcurrentHashMap<String, String> localDBToRemoteDB = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
localTableToRemoteTable
+            = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<String, String>>>
+            localColumnToRemoteColumn = new ConcurrentHashMap<>();
+
+    private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
+    private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap 
= new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicBoolean>> columnNamesLoadedMap
+            = new ConcurrentHashMap<>();
+
+    private final boolean isLowerCaseMetaNames;
+    private final String metaNamesMapping;
+
+    public IdentifierMapping(boolean isLowerCaseMetaNames, String 
metaNamesMapping) {
+        this.isLowerCaseMetaNames = isLowerCaseMetaNames;
+        this.metaNamesMapping = metaNamesMapping;
+    }
+
+    public List<String> setDatabaseNameMapping(List<String> 
remoteDatabaseNames) {
+        JsonNode databasesNode = readAndParseJson(metaNamesMapping, 
"databases");
+
+        Map<String, String> databaseNameMapping = Maps.newTreeMap();
+        if (databasesNode.isArray()) {
+            for (JsonNode node : databasesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                String mapping = node.path("mapping").asText();
+                databaseNameMapping.put(remoteDatabase, mapping);
+            }
+        }
+
+        Map<String, List<String>> result = 
nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
+                databaseNameMapping, isLowerCaseMetaNames);
+        List<String> localDatabaseNames = result.get("localNames");
+        List<String> conflictNames = result.get("conflictNames");
+        if (!conflictNames.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflict database/schema names found when 
lower_case_meta_names is true: " + conflictNames
+                            + ". Please set lower_case_meta_names to false or"
+                            + " use meta_name_mapping to specify the names.");
+        }
+        return localDatabaseNames;
+    }
+
+    public List<String> setTableNameMapping(String remoteDbName, List<String> 
remoteTableNames) {
+        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
+
+        Map<String, String> tableNameMapping = Maps.newTreeMap();
+        if (tablesNode.isArray()) {
+            for (JsonNode node : tablesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                if (remoteDbName.equals(remoteDatabase)) {
+                    String remoteTable = node.path("remoteTable").asText();
+                    String mapping = node.path("mapping").asText();
+                    tableNameMapping.put(remoteTable, mapping);
+                }
+            }
+        }
+
+        localTableToRemoteTable.putIfAbsent(remoteDbName, new 
ConcurrentHashMap<>());
+
+        List<String> localTableNames;
+        List<String> conflictNames;
+
+        if (GlobalVariable.lowerCaseTableNames == 1) {
+            Map<String, List<String>> result = 
nameListToMapping(remoteTableNames,
+                    localTableToRemoteTable.get(remoteDbName),
+                    tableNameMapping, true);
+            localTableNames = result.get("localNames");
+            conflictNames = result.get("conflictNames");
+            if (!conflictNames.isEmpty()) {
+                throw new RuntimeException(
+                        "Conflict table names found in remote database/schema: 
" + remoteDbName
+                                + " when lower_case_table_names is 1: " + 
conflictNames
+                                + ". Please use meta_name_mapping to specify 
the names.");
+            }
+        } else {
+            Map<String, List<String>> result = 
nameListToMapping(remoteTableNames,
+                    localTableToRemoteTable.get(remoteDbName),
+                    tableNameMapping, isLowerCaseMetaNames);
+            localTableNames = result.get("localNames");
+            conflictNames = result.get("conflictNames");
+
+            if (!conflictNames.isEmpty()) {
+                throw new RuntimeException(
+                        "Conflict table names found in remote database/schema: 
" + remoteDbName
+                                + "when lower_case_meta_names is true: " + 
conflictNames
+                                + ". Please set lower_case_meta_names to false 
or"
+                                + " use meta_name_mapping to specify the table 
names.");
+            }
+        }
+        return localTableNames;
+    }
+
+    public List<Column> setColumnNameMapping(String remoteDbName, String 
remoteTableName, List<Column> remoteColumns) {
+        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
+
+        Map<String, String> columnNameMapping = Maps.newTreeMap();
+        if (tablesNode.isArray()) {
+            for (JsonNode node : tablesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                String remoteTable = node.path("remoteTable").asText();
+                if (remoteDbName.equals(remoteDatabase) && 
remoteTable.equals(remoteTableName)) {
+                    String remoteColumn = node.path("remoteColumn").asText();
+                    String mapping = node.path("mapping").asText();
+                    columnNameMapping.put(remoteColumn, mapping);
+                }
+            }
+        }
+        localColumnToRemoteColumn.putIfAbsent(remoteDbName, new 
ConcurrentHashMap<>());
+        
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new 
ConcurrentHashMap<>());
+
+        List<String> localColumnNames;
+        List<String> conflictNames;
+
+        // Get the name from localColumns and save it to List<String>
+        List<String> remoteColumnNames = Lists.newArrayList();
+        for (Column remoteColumn : remoteColumns) {
+            remoteColumnNames.add(remoteColumn.getName());
+        }
+
+        Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
+                
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
+                columnNameMapping, isLowerCaseMetaNames);
+        localColumnNames = result.get("localNames");
+        conflictNames = result.get("conflictNames");
+        if (!conflictNames.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflict column names found in remote database/schema: " 
+ remoteDbName
+                            + " in remote table: " + remoteTableName
+                            + " when lower_case_meta_names is true: " + 
conflictNames
+                            + ". Please set lower_case_meta_names to false or"
+                            + " use meta_name_mapping to specify the column 
names.");
+        }
+        // Replace the name in remoteColumns with localColumnNames
+        for (int i = 0; i < remoteColumns.size(); i++) {
+            remoteColumns.get(i).setName(localColumnNames.get(i));
+        }
+        return remoteColumns;
+    }
+
+    public String getRemoteDatabaseName(String localDbName) {
+        return getRequiredMapping(localDBToRemoteDB, localDbName, "database", 
this::loadDatabaseNamesIfNeeded,
+                localDbName);
+    }
+
+    public String getRemoteTableName(String localDbName, String 
localTableName) {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        Map<String, String> tableMap = 
localTableToRemoteTable.computeIfAbsent(remoteDbName,
+                k -> new ConcurrentHashMap<>());
+        return getRequiredMapping(tableMap, localTableName, "table", () -> 
loadTableNamesIfNeeded(localDbName),
+                localTableName);
+    }
+
+    public Map<String, String> getRemoteColumnNames(String localDbName, String 
localTableName) {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
+        ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
tableColumnMap
+                = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> 
new ConcurrentHashMap<>());
+        Map<String, String> columnMap = 
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
+        if (columnMap.isEmpty()) {
+            LOG.info("Column name mapping missing, loading column names for 
localDbName: {}, localTableName: {}",
+                    localDbName, localTableName);
+            loadColumnNamesIfNeeded(localDbName, localTableName);
+            columnMap = tableColumnMap.get(remoteTableName);
+        }
+        if (columnMap.isEmpty()) {
+            LOG.warn("No remote column found for localTableName: {}. Please 
refresh this catalog.", localTableName);
+            throw new RuntimeException(
+                    "No remote column found for localTableName: " + 
localTableName + ". Please refresh this catalog.");
+        }
+        return columnMap;
+    }
+
+
+    private void loadDatabaseNamesIfNeeded() {
+        if (dbNamesLoaded.compareAndSet(false, true)) {
+            try {
+                loadDatabaseNames();
+            } catch (Exception e) {
+                dbNamesLoaded.set(false); // Reset on failure
+                LOG.warn("Error loading database names", e);
+            }
+        }
+    }
+
+    private void loadTableNamesIfNeeded(String localDbName) {
+        AtomicBoolean isLoaded = 
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
+        if (isLoaded.compareAndSet(false, true)) {
+            try {
+                loadTableNames(localDbName);
+            } catch (Exception e) {
+                tableNamesLoadedMap.get(localDbName).set(false); // Reset on 
failure
+                LOG.warn("Error loading table names for localDbName: {}", 
localDbName, e);
+            }
+        }
+    }
+
+    private void loadColumnNamesIfNeeded(String localDbName, String 
localTableName) {
+        columnNamesLoadedMap.putIfAbsent(localDbName, new 
ConcurrentHashMap<>());
+        AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
+                .computeIfAbsent(localTableName, k -> new 
AtomicBoolean(false));
+        if (isLoaded.compareAndSet(false, true)) {
+            try {
+                loadColumnNames(localDbName, localTableName);
+            } catch (Exception e) {
+                
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset 
on failure
+                LOG.warn("Error loading column names for localDbName: {}, 
localTableName: {}", localDbName,
+                        localTableName, e);
+            }
+        }
+    }
+
+    private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, 
Runnable loadIfNeeded,
+            String entityName) {
+        if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
+            LOG.info("{} mapping missing, loading for {}: {}", typeName, 
typeName, entityName);
+            loadIfNeeded.run();
+        }
+        V value = map.get(key);
+        if (value == null) {
+            LOG.warn("No remote {} found for {}: {}. Please refresh this 
catalog.", typeName, typeName, entityName);
+            throw new RuntimeException("No remote " + typeName + " found for " 
+ typeName + ": " + entityName
+                    + ". Please refresh this catalog.");
+        }
+        return value;
+    }
+
+    // Load the database name from the data source.
+    // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() 
must be used to update the mapping.
+    protected abstract void loadDatabaseNames();
+
+    // Load the table names for the specified database from the data source.
+    // In the corresponding getTableNameList(), setTableNameMapping() must be 
used to update the mapping.
+    protected abstract void loadTableNames(String localDbName);
+
+    // Load the column names for a specified table in a database from the data 
source.
+    // In the corresponding getColumnNameList(), setColumnNameMapping() must 
be used to update the mapping.
+    protected abstract void loadColumnNames(String localDbName, String 
localTableName);
+
+    private JsonNode readAndParseJson(String jsonPath, String nodeName) {
+        JsonNode rootNode;
+        try {
+            rootNode = mapper.readTree(jsonPath);
+            return rootNode.path(nodeName);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("parse meta_names_mapping property 
error", e);
+        }
+    }
+
+    private Map<String, List<String>> nameListToMapping(List<String> 
remoteNames,
+            ConcurrentHashMap<String, String> localNameToRemoteName,
+            Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
+        List<String> filteredDatabaseNames = Lists.newArrayList();
+        Set<String> lowerCaseNames = Sets.newHashSet();
+        Map<String, List<String>> nameMap = Maps.newHashMap();
+        List<String> conflictNames = Lists.newArrayList();
 
-public interface IdentifierMapping {
-    List<String> fromRemoteDatabaseName(List<String> remoteDatabaseNames);
+        for (String name : remoteNames) {
+            String mappedName = nameMapping.getOrDefault(name, name);
+            String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() 
: mappedName;
 
-    List<String> fromRemoteTableName(String remoteDatabaseName, List<String> 
remoteTableNames);
+            // Use computeIfAbsent to ensure atomicity
+            localNameToRemoteName.computeIfAbsent(localName, k -> name);
 
-    List<Column> fromRemoteColumnName(String remoteDatabaseName, String 
remoteTableName, List<Column> remoteColumns);
+            if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
+                if (nameMap.containsKey(localName)) {
+                    nameMap.get(localName).add(mappedName);
+                }
+            } else {
+                nameMap.putIfAbsent(localName, 
Lists.newArrayList(Collections.singletonList(mappedName)));
+            }
 
-    String toRemoteDatabaseName(String localDatabaseName);
+            filteredDatabaseNames.add(localName);
+        }
 
-    String toRemoteTableName(String remoteDatabaseName, String localTableName);
+        for (List<String> conflictNameList : nameMap.values()) {
+            if (conflictNameList.size() > 1) {
+                conflictNames.addAll(conflictNameList);
+            }
+        }
 
-    Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String 
remoteTableName);
+        Map<String, List<String>> result = Maps.newConcurrentMap();
+        result.put("localNames", filteredDatabaseNames);
+        result.put("conflictNames", conflictNames);
+        return result;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to