This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d94119ca24 [Refactor](catalog) Refactor Jdbc Catalog external name 
case mapping rules (#28414)
3d94119ca24 is described below

commit 3d94119ca243794f845027d2a2b3fce75fe0ab28
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Mon Feb 19 11:19:44 2024 +0800

    [Refactor](catalog) Refactor Jdbc Catalog external name case mapping rules 
(#28414)
---
 .../docker-compose/mysql/init/02-create-db.sql     |   3 +
 .../docker-compose/mysql/init/03-create-table.sql  |  12 +
 .../docker-compose/mysql/init/04-insert.sql        |   4 +
 .../docker-compose/oracle/init/03-create-table.sql |   7 +
 .../docker-compose/oracle/init/04-insert.sql       |   2 +
 docs/en/docs/lakehouse/multi-catalog/jdbc.md       | 114 +++++++-
 docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md    | 116 +++++++-
 .../java/org/apache/doris/analysis/SlotRef.java    |   2 +-
 .../org/apache/doris/catalog/JdbcResource.java     |  10 +-
 .../java/org/apache/doris/catalog/JdbcTable.java   |  68 ++---
 .../java/org/apache/doris/catalog/Resource.java    |   2 +
 .../doris/catalog/external/JdbcExternalTable.java  |   9 +-
 .../apache/doris/datasource/CatalogFactory.java    |   2 +-
 .../doris/datasource/jdbc/JdbcExternalCatalog.java |  44 ++-
 .../datasource/jdbc/JdbcIdentifierMapping.java     |  45 ++++
 .../doris/datasource/jdbc/client/JdbcClient.java   | 258 +++++++-----------
 .../datasource/jdbc/client/JdbcClientConfig.java   |  23 +-
 .../datasource/jdbc/client/JdbcMySQLClient.java    |  39 +--
 .../datasource/jdbc/client/JdbcOracleClient.java   |  65 ++---
 .../datasource/mapping/IdentifierMapping.java      | 299 +++++++++++++++++++++
 .../doris/planner/external/jdbc/JdbcScanNode.java  |   6 +-
 .../doris/planner/external/jdbc/JdbcTableSink.java |   2 +-
 .../datasource/jdbc/JdbcExternalCatalogTest.java   |  25 +-
 .../jdbc/test_mysql_jdbc_catalog.out               |  28 ++
 .../jdbc/test_oracle_jdbc_catalog.out              |  17 ++
 .../jdbc/test_mysql_jdbc_catalog.groovy            |  52 ++++
 .../jdbc/test_oracle_jdbc_catalog.groovy           |  31 ++-
 27 files changed, 949 insertions(+), 336 deletions(-)

diff --git a/docker/thirdparties/docker-compose/mysql/init/02-create-db.sql 
b/docker/thirdparties/docker-compose/mysql/init/02-create-db.sql
index 84d7db56271..418de504da9 100644
--- a/docker/thirdparties/docker-compose/mysql/init/02-create-db.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/02-create-db.sql
@@ -16,4 +16,7 @@
 -- under the License.
 
 create database doris_test;
+create database DORIS;
+create database Doris;
+create database doris;
 create database show_test_do_not_modify;
diff --git a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql 
b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
index 4467851606a..cb8bb5d9cb4 100644
--- a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
@@ -331,3 +331,15 @@ CREATE TABLE doris_test.test_zd (
 `id` int(10) unsigned NOT NULL,
 `d_z` date NOT NULL
 );
+
+CREATE TABLE Doris.DORIS (
+  id varchar(128)
+);
+
+CREATE TABLE Doris.Doris (
+  id varchar(128)
+);
+
+CREATE TABLE Doris.doris (
+  id varchar(128)
+);
\ No newline at end of file
diff --git a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql 
b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
index 588f9dd9159..bb1b0962b45 100644
--- a/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/04-insert.sql
@@ -1160,3 +1160,7 @@ SET SESSION sql_mode=(SELECT 
REPLACE(@@sql_mode,'NO_ZERO_DATE',''));
 SET SESSION sql_mode=(SELECT REPLACE(@@sql_mode,'NO_ZERO_IN_DATE',''));
 
 insert into doris_test.test_zd (id,d_z) VALUES 
(1,'0000-00-00'),(2,'2022-01-01');
+
+insert into Doris.DORIS values ('DORIS');
+insert into Doris.Doris values ('Doris');
+insert into Doris.doris values ('doris');
diff --git a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql 
b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
index 24bf6a7a856..35abc5d3762 100644
--- a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
@@ -175,3 +175,10 @@ create table doris_test.test_all_types (
   t6 interval year(3) to month,
   t7 interval day(3) to second(6)
 );
+
+create table doris_test.lower_test (
+"DORIS" varchar2(20),
+"Doris" varchar2(20),
+"doris" varchar2(20)
+);
+
diff --git a/docker/thirdparties/docker-compose/oracle/init/04-insert.sql 
b/docker/thirdparties/docker-compose/oracle/init/04-insert.sql
index 663851cfa4e..99d4476bdaf 100644
--- a/docker/thirdparties/docker-compose/oracle/init/04-insert.sql
+++ b/docker/thirdparties/docker-compose/oracle/init/04-insert.sql
@@ -103,4 +103,6 @@ null, null, null,
 null, null, null, null, null, null, null
 );
 
+insert into doris_test.lower_test values ('DORIS', 'Doris', 'doris');
+
 commit;
diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md 
b/docs/en/docs/lakehouse/multi-catalog/jdbc.md
index 859bfeceead..c7eb12332df 100644
--- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md
+++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md
@@ -52,7 +52,8 @@ PROPERTIES ("key"="value", ...)
 | `driver_url `             | Yes             |               | JDBC Driver 
Jar                                                                             
                             |
 | `driver_class `           | Yes             |               | JDBC Driver 
Class                                                                           
                             |
 | `only_specified_database` | No              | "false"       | Whether only 
the database specified to be synchronized.                                      
                            |
-| `lower_case_table_names`  | No              | "false"       | Whether to 
synchronize the database name, table name and column name of jdbc external data 
source in lowercase.          |
+| `lower_case_meta_names`   | No              | "false"       | Whether to 
synchronize the database name, table name and column name of jdbc external data 
source in lowercase.          |
+| `meta_names_mapping`      | No              | ""            | When the jdbc 
external data source has the same name but different case, such as DORIS and 
doris, Doris reports an error when querying the Catalog due to ambiguity. In 
this case, the `meta_names_mapping` parameter needs to be configured to resolve 
the conflict. |
 | `include_database_list`   | No              | ""            | When 
only_specified_database=true,only synchronize the specified databases. split 
with ','. db name is case sensitive.   |
 | `exclude_database_list`   | No              | ""            | When 
only_specified_database=true,do not synchronize the specified databases. split 
with ','. db name is case sensitive. |
 
@@ -66,23 +67,118 @@ PROPERTIES ("key"="value", ...)
 
 3. HTTP address. For example, 
`https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar`.
 The system will download the Driver file from the HTTP address. This only 
supports HTTP services with no authentication requirements.
 
-### Lowercase table name synchronization
+### Lowercase name synchronization
 
-When `lower_case_table_names` is set to `true`, Doris is able to query 
non-lowercase databases and tables and columns by maintaining a mapping of 
lowercase names to actual names on the remote system
+When `lower_case_meta_names` is set to `true`, Doris maintains the mapping of 
lowercase names to actual names in the remote system, enabling queries to use 
lowercase to query non-lowercase databases, tables and columns of external data 
sources.
+
+Since FE has the `lower_case_table_names` parameter, it will affect the table 
name case rules during query, so the rules are as follows
+
+* When FE `lower_case_table_names` config is 0
+
+  lower_case_meta_names = false, the case is consistent with the source 
library.
+  lower_case_meta_names = true, lowercase repository table column names.
+
+* When FE `lower_case_table_names` config is 1
+
+  lower_case_meta_names = false, the case of db and column is consistent with 
the source library, but the table is stored in lowercase
+  lower_case_meta_names = true, lowercase repository table column names.
+
+* When FE `lower_case_table_names` config is 2
+
+  lower_case_meta_names = false, the case is consistent with the source 
library.
+  lower_case_meta_names = true, lowercase repository table column names.
+
+If the parameter configuration when creating the Catalog matches the lowercase 
conversion rule in the above rules, Doris will convert the corresponding name 
to lowercase and store it in Doris. When querying, you need to use the 
lowercase name displayed by Doris.
+
+If the external data source has the same name but different case, such as 
DORIS and doris, Doris will report an error when querying the Catalog due to 
ambiguity. In this case, you need to configure the `meta_names_mapping` 
parameter to resolve the conflict.
+
+The `meta_names_mapping` parameter accepts a Json format string with the 
following format:
+
+```json
+{
+  "databases": [
+    {
+      "remoteDatabase": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "doris",
+      "mapping": "doris_2"
+    }],
+  "tables": [
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "doris",
+      "mapping": "doris_2"
+    }],
+  "columns": [
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "remoteColumn": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "remoteColumn": "doris",
+      "mapping": "doris_2"
+    }]
+}
+```
+
+When filling this configuration into the statement that creates the Catalog, 
there are double quotes in Json, so you need to escape the double quotes or 
directly use single quotes to wrap the Json string when filling in.
+
+```sql
+CREATE CATALOG jdbc_catalog PROPERTIES (
+    ...
+    "meta_names_mapping" = 
"{\"databases\":[{\"remoteDatabase\":\"DORIS\",\"mapping\":\"doris_1\"},{\"remoteDatabase\":\"doris\",\"mapping\":\"doris_2\"}]}"
+    ...
+);
+```
+
+或者
+```sql
+CREATE CATALOG jdbc_catalog PROPERTIES (
+    ...
+    "meta_names_mapping" = 
'{"databases":[{"remoteDatabase":"DORIS","mapping":"doris_1"},{"remoteDatabase":"doris","mapping":"doris_2"}]}'
+    ...
+);
+
+```
 
 **Notice:**
 
-1. In versions before Doris 2.0.3, it is only valid for Oracle database. When 
querying, all library names and table names will be converted to uppercase 
before querying Oracle, for example:
+JDBC Catalog has the following three stages for mapping rules for external 
table case:
+
+* Doris versions prior to 2.0.3
+
+  This configuration name is `lower_case_table_names`, which is only valid for 
Oracle database. Setting this parameter to `true` in other data sources will 
affect the query, so please do not set it.
+
+  When querying Oracle, all library names and table names will be converted to 
uppercase before querying Oracle, for example:
+
+  Oracle has the TEST table in the TEST space. When Doris creates the Catalog, 
set `lower_case_table_names` to `true`, then Doris can query the TEST table 
through `select * from oracle_catalog.test.test`, and Doris will automatically 
format test.test into TEST.TEST is sent to Oracle. It should be noted that this 
is the default behavior, which also means that lowercase table names in Oracle 
cannot be queried.
+
+* Doris 2.0.3 version:
+
+  This configuration is called `lower_case_table_names` and is valid for all 
databases. When querying, all library names and table names will be converted 
into real names and then queried. If you upgrade from an old version to 2.0.3, 
you need ` Refresh <catalog_name>` can take effect.
+
+  However, if the library, table, or column names differ only in case, such as 
`Doris` and `doris`, Doris cannot query them due to ambiguity.
 
-   Oracle has the TEST table in the TEST space. When Doris creates the 
Catalog, set `lower_case_table_names` to `true`, then Doris can query the TEST 
table through `select * from oracle_catalog.test.test`, and Doris will 
automatically format test.test into TEST.TEST is sent to Oracle. It should be 
noted that this is the default behavior, which also means that lowercase table 
names in Oracle cannot be queried.
+  And when the `lower_case_table_names` parameter of the FE parameter is set 
to `1` or `2`, the `lower_case_table_names` parameter of the JDBC Catalog must 
be set to `true`. If the `lower_case_table_names` of the FE parameter is set to 
`0`, the JDBC Catalog parameter can be `true` or `false`, defaulting to `false`.
 
-   For other databases, you still need to specify the real library name and 
table name when querying.
+* Doris 2.1.0 and later versions:
 
-2. In Doris 2.0.3 and later versions, it is valid for all databases. When 
querying, all database names and table names and columns will be converted into 
real names and then queried. If you upgrade from an old version to 2.0. 3, 
`Refresh <catalog_name>` is required to take effect.
+  In order to avoid confusion with the `lower_case_table_names` parameter of 
FE conf, this configuration name is changed to `lower_case_meta_names`, which 
is valid for all databases. During query, all library names, table names and 
column names will be converted into real names, and then Check it out. If you 
upgrade from an old version to 2.0.4, you need `Refresh <catalog_name>` to take 
effect.
 
-   However, if the database or table or column names differ only in case, such 
as `Doris` and `doris`, Doris cannot query them due to ambiguity.
+  For specific rules, please refer to the introduction of 
`lower_case_meta_names` at the beginning of this section.
 
-3. When the FE parameter's `lower_case_table_names` is set to `1` or `2`, the 
JDBC Catalog's `lower_case_table_names` parameter must be set to `true`. If the 
FE parameter's `lower_case_table_names` is set to `0`, the JDBC Catalog 
parameter can be `true` or `false` and defaults to `false`. This ensures 
consistency and predictability in how Doris handles internal and external table 
configurations.
+  Users who have previously set the JDBC Catalog `lower_case_table_names` 
parameter will automatically have `lower_case_table_names` converted to 
`lower_case_meta_names` when upgrading to 2.0.4.
 
 ### Specify synchronization database:
 
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
index 30c34d0f4ae..cae3d0e1222 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
@@ -44,14 +44,15 @@ PROPERTIES ("key"="value", ...)
 
 ## 参数说明
 
-| 参数                      | 必须 | 默认值  | 说明                                     
                               |
+| 参数                        | 必须 | 默认值  | 说明                                   
                                 |
 
|---------------------------|-----|---------|-----------------------------------------------------------------------|
 | `user`                    | 是   |         | 对应数据库的用户名                        
                                     |
 | `password`                | 是   |         | 对应数据库的密码                         
                                     |
 | `jdbc_url`                | 是   |         | JDBC 连接串                         
                                     |
 | `driver_url`              | 是   |         | JDBC Driver Jar 包名称              
                                     |
 | `driver_class`            | 是   |         | JDBC Driver Class 名称             
                                     |
-| `lower_case_table_names`  | 否   | "false" | 是否以小写的形式同步jdbc外部数据源的库名和表名以及列名    
                                     |
+| `lower_case_meta_names`   | 否   | "false" | 是否以小写的形式同步jdbc外部数据源的库名和表名以及列名    
                                     |
+| `meta_names_mapping`    | 否   | ""      | 当jdbc外部数据源存在名称相同只有大小写不同的情况,例如 
DORIS 和 doris,Doris 由于歧义而在查询 Catalog 时报错,此时需要配置 `meta_names_mapping` 参数来解决冲突。 |
 | `only_specified_database` | 否   | "false" | 指定是否只同步指定的 database              
                                     |
 | `include_database_list`   | 否   | ""      | 
当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。     |
 | `exclude_database_list`   | 否   | ""      | 
当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。 |
@@ -66,23 +67,120 @@ PROPERTIES ("key"="value", ...)
 
 3. Http 
地址。如:`https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar`。系统会从这个
 http 地址下载 Driver 文件。仅支持无认证的 http 服务。
 
-### 小写表名同步
+### 小写名称同步
+
+当 `lower_case_meta_names` 设置为 `true` 时,Doris 
通过维护小写名称到远程系统中实际名称的映射,使查询时能够使用小写去查询外部数据源非小写的数据库和表以及列。
+
+由于 FE 存在 `lower_case_table_names` 的参数,会影响查询时的表名大小写规则,所以规则如下
+
+* 当 FE `lower_case_table_names` config 为 0 时
+
+   lower_case_meta_names = false,大小写和源库一致。
+   lower_case_meta_names = true,小写存储库表列名。
+
+* 当 FE `lower_case_table_names` config 为 1 时
+
+   lower_case_meta_names = false,db 和 column 的大小写和源库一致,但是 table 存储为小写
+   lower_case_meta_names = true,小写存储库表列名。
+
+* 当 FE `lower_case_table_names` config 为 2 时
+
+   lower_case_meta_names = false,大小写和源库一致。
+   lower_case_meta_names = true,小写存储库表列名。
+
+如果创建 Catalog 时的参数配置匹配到了上述规则中的转变小写规则,则 Doris 会将对应的名称转变为小写存储在 Doris 中,查询时需使用 
Doris 显示的小写名称去查询。
+
+如果外部数据源存在名称相同只有大小写不同的情况,例如 DORIS 和 doris,Doris 由于歧义而在查询 Catalog 时报错,此时需要配置 
`meta_names_mapping` 参数来解决冲突。
+
+`meta_names_mapping` 参数接受一个 Json 格式的字符串,格式如下:
+
+```json
+{
+  "databases": [
+    {
+      "remoteDatabase": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "doris",
+      "mapping": "doris_2"
+    }],
+  "tables": [
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "doris",
+      "mapping": "doris_2"
+    }],
+  "columns": [
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "remoteColumn": "DORIS",
+      "mapping": "doris_1"
+    },
+    {
+      "remoteDatabase": "DORIS",
+      "remoteTable": "DORIS",
+      "remoteColumn": "doris",
+      "mapping": "doris_2"
+    }]
+}
+```
+
+在将此配置填写到创建 Catalog 的语句中时,Json 中存在双引号,因此在填写时需要将双引号转义或者直接使用单引号包裹 Json 字符串。
+
+```sql
+CREATE CATALOG jdbc_catalog PROPERTIES (
+    ...
+    "meta_names_mapping" = 
"{\"databases\":[{\"remoteDatabase\":\"DORIS\",\"mapping\":\"doris_1\"},{\"remoteDatabase\":\"doris\",\"mapping\":\"doris_2\"}]}"
+    ...
+);
+```
+
+或者
+```sql
+CREATE CATALOG jdbc_catalog PROPERTIES (
+    ...
+    "meta_names_mapping" = 
'{"databases":[{"remoteDatabase":"DORIS","mapping":"doris_1"},{"remoteDatabase":"doris","mapping":"doris_2"}]}'
+    ...
+);
+
+```
+
 
-当 `lower_case_table_names` 设置为 `true` 时,Doris 
通过维护小写名称到远程系统中实际名称的映射,能够查询非小写的数据库和表以及列
 
 **注意:**
 
-1. 在 Doris 2.0.3 之前的版本,仅对 Oracle 数据库有效,在查询时,会将所有的库名和表名转换为大写,再去查询 Oracle,例如:
+JDBC Catalog 对于外部表大小写的映射规则存在如下三个阶段:
+
+* Doris 2.0.3 之前的版本
+
+    此配置名为 `lower_case_table_names`,仅对 Oracle 数据库有效,如在其他数据源设置此参数为 `true` 
会影响查询,请勿设置。
+    
+    在查询 Oracle 时,会将所有的库名和表名转换为大写,再去查询 Oracle,例如:
 
     Oracle 在 TEST 空间下有 TEST 表,Doris 创建 Catalog 时设置 `lower_case_table_names` 为 
`true`,则 Doris 可以通过 `select * from oracle_catalog.test.test` 查询到 TEST 表,Doris 
会自动将 test.test 格式化成 TEST.TEST 下发到 Oracle,需要注意的是这是个默认行为,也意味着不能查询 Oracle 中小写的表名。
 
-    对于其他数据库,仍需要在查询时指定真实的库名和表名。
+* Doris 2.0.3 版本:
 
-2. 在 Doris 2.0.3 及之后的版本,对所有的数据库都有效,在查询时,会将所有的库名和表名以及列名转换为真实的名称,再去查询,如果是从老版本升级到 
2.0.3 ,需要 `Refresh <catalog_name>` 才能生效。
+    此配置名为 
`lower_case_table_names`,对所有的数据库都有效,在查询时,会将所有的库名和表名转换为真实的名称,再去查询,如果是从老版本升级到 
2.0.3 ,需要 `Refresh <catalog_name>` 才能生效。
 
     但是,如果库名、表名或列名只有大小写不同,例如 `Doris` 和 `doris`,则 Doris 由于歧义而无法查询它们。
 
-3. 当 FE 参数的 `lower_case_table_names` 设置为 `1` 或 `2` 时,JDBC Catalog 的 
`lower_case_table_names` 参数必须设置为 `true`。如果 FE 参数的 `lower_case_table_names` 设置为 
`0`,则 JDBC Catalog 的参数可以为 `true` 或 `false`,默认为 `false`。这确保了 Doris 
在处理内部和外部表配置时的一致性和可预测性。
+    并且当 FE 参数的 `lower_case_table_names` 设置为 `1` 或 `2` 时,JDBC Catalog 的 
`lower_case_table_names` 参数必须设置为 `true`。如果 FE 参数的 `lower_case_table_names` 设置为 
`0`,则 JDBC Catalog 的参数可以为 `true` 或 `false`,默认为 `false`。
+
+* Doris 2.1.0 以及之后版本:
+
+    为了避免和 FE conf 的 `lower_case_table_names` 参数混淆,此配置名改为 
`lower_case_meta_names`,对所有的数据库都有效,在查询时,会将所有的库名和表名以及列名转换为真实的名称,再去查询,如果是从老版本升级到 
2.0.4 ,需要 `Refresh <catalog_name>` 才能生效。
+
+    具体规则参考本小节开始对于 `lower_case_meta_names` 的介绍。
+
+    此前设置过 JDBC Catalog `lower_case_table_names` 参数的用户会在升级到 2.0.4 时,自动将 
`lower_case_table_names` 转换为 `lower_case_meta_names`。
 
 ### 指定同步数据库
 
@@ -551,7 +649,7 @@ CREATE CATALOG jdbc_doris PROPERTIES (
 | BITMAP     | BITMAP                 | 
查询BITMAP需要设置`return_object_data_as_binary=true`  |
 | Other      | UNSUPPORTED            |                                        
              |
 
-### Clickhouse
+### ClickHouse
 
 #### 创建示例
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
index 7d9763ea694..78a39fadc59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
@@ -307,7 +307,7 @@ public class SlotRef extends Expr {
             if (tableType.equals(TableType.JDBC_EXTERNAL_TABLE) || 
tableType.equals(TableType.JDBC) || tableType
                     .equals(TableType.ODBC)) {
                 if (inputTable instanceof JdbcTable) {
-                    return ((JdbcTable) inputTable).getProperRealColumnName(
+                    return ((JdbcTable) inputTable).getProperRemoteColumnName(
                             ((JdbcTable) inputTable).getJdbcTableType(), col);
                 } else if (inputTable instanceof OdbcTable) {
                     return JdbcTable.databaseProperName(((OdbcTable) 
inputTable).getOdbcTableType(), col);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index d95f77b0b7e..03bdd3d2f30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -95,7 +95,6 @@ public class JdbcResource extends Resource {
     public static final String DRIVER_URL = "driver_url";
     public static final String TYPE = "type";
     public static final String ONLY_SPECIFIED_DATABASE = 
"only_specified_database";
-    public static final String LOWER_CASE_TABLE_NAMES = 
"lower_case_table_names";
     public static final String CONNECTION_POOL_MIN_SIZE = 
"connection_pool_min_size";
     public static final String CONNECTION_POOL_MAX_SIZE = 
"connection_pool_max_size";
     public static final String CONNECTION_POOL_MAX_WAIT_TIME = 
"connection_pool_max_wait_time";
@@ -112,7 +111,8 @@ public class JdbcResource extends Resource {
             TYPE,
             CREATE_TIME,
             ONLY_SPECIFIED_DATABASE,
-            LOWER_CASE_TABLE_NAMES,
+            LOWER_CASE_META_NAMES,
+            META_NAMES_MAPPING,
             INCLUDE_DATABASE_LIST,
             EXCLUDE_DATABASE_LIST,
             CONNECTION_POOL_MIN_SIZE,
@@ -123,7 +123,8 @@ public class JdbcResource extends Resource {
     ).build();
     private static final ImmutableList<String> OPTIONAL_PROPERTIES = new 
ImmutableList.Builder<String>().add(
             ONLY_SPECIFIED_DATABASE,
-            LOWER_CASE_TABLE_NAMES,
+            LOWER_CASE_META_NAMES,
+            META_NAMES_MAPPING,
             INCLUDE_DATABASE_LIST,
             EXCLUDE_DATABASE_LIST,
             CONNECTION_POOL_MIN_SIZE,
@@ -139,7 +140,8 @@ public class JdbcResource extends Resource {
 
     static {
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, 
"false");
-        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_META_NAMES, "false");
+        OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(META_NAMES_MAPPING, "");
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
         OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
index 38aa8a73bdd..4e62fb1acd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
@@ -53,9 +53,9 @@ public class JdbcTable extends Table {
 
     private static final String CATALOG_ID = "catalog_id";
     private static final String TABLE = "table";
-    private static final String REAL_DATABASE = "real_database";
-    private static final String REAL_TABLE = "real_table";
-    private static final String REAL_COLUMNS = "real_columns";
+    private static final String REMOTE_DATABASE = "remote_database";
+    private static final String REMOTE_TABLE = "remote_table";
+    private static final String REMOTE_COLUMNS = "remote_columns";
     private static final String RESOURCE = "resource";
     private static final String TABLE_TYPE = "table_type";
     private static final String URL = "jdbc_url";
@@ -69,9 +69,9 @@ public class JdbcTable extends Table {
     private String externalTableName;
 
     // real name only for jdbc catalog
-    private String realDatabaseName;
-    private String realTableName;
-    private Map<String, String> realColumnNames;
+    private String remoteDatabaseName;
+    private String remoteTableName;
+    private Map<String, String> remoteColumnNames;
 
     private String jdbcTypeName;
 
@@ -122,10 +122,10 @@ public class JdbcTable extends Table {
 
     public String getInsertSql(List<String> insertCols) {
         StringBuilder sb = new StringBuilder("INSERT INTO ");
-        
sb.append(getProperRealFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
+        
sb.append(getProperRemoteFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
         sb.append("(");
         List<String> transformedInsertCols = insertCols.stream()
-                .map(col -> 
getProperRealColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
+                .map(col -> 
getProperRemoteColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
                 .collect(Collectors.toList());
         sb.append(String.join(",", transformedInsertCols));
         sb.append(")");
@@ -249,9 +249,9 @@ public class JdbcTable extends Table {
         serializeMap.put(DRIVER_CLASS, driverClass);
         serializeMap.put(DRIVER_URL, driverUrl);
         serializeMap.put(CHECK_SUM, checkSum);
-        serializeMap.put(REAL_DATABASE, realDatabaseName);
-        serializeMap.put(REAL_TABLE, realTableName);
-        serializeMap.put(REAL_COLUMNS, 
objectMapper.writeValueAsString(realColumnNames));
+        serializeMap.put(REMOTE_DATABASE, remoteDatabaseName);
+        serializeMap.put(REMOTE_TABLE, remoteTableName);
+        serializeMap.put(REMOTE_COLUMNS, 
objectMapper.writeValueAsString(remoteColumnNames));
 
         int size = (int) serializeMap.values().stream().filter(v -> {
             return v != null;
@@ -287,14 +287,14 @@ public class JdbcTable extends Table {
         driverClass = serializeMap.get(DRIVER_CLASS);
         driverUrl = serializeMap.get(DRIVER_URL);
         checkSum = serializeMap.get(CHECK_SUM);
-        realDatabaseName = serializeMap.get(REAL_DATABASE);
-        realTableName = serializeMap.get(REAL_TABLE);
-        String realColumnNamesJson = serializeMap.get(REAL_COLUMNS);
+        remoteDatabaseName = serializeMap.get(REMOTE_DATABASE);
+        remoteTableName = serializeMap.get(REMOTE_TABLE);
+        String realColumnNamesJson = serializeMap.get(REMOTE_COLUMNS);
         if (realColumnNamesJson != null) {
-            realColumnNames = objectMapper.readValue(realColumnNamesJson, new 
TypeReference<Map<String, String>>() {
+            remoteColumnNames = objectMapper.readValue(realColumnNamesJson, 
new TypeReference<Map<String, String>>() {
             });
         } else {
-            realColumnNames = Maps.newHashMap();
+            remoteColumnNames = Maps.newHashMap();
         }
     }
 
@@ -306,28 +306,28 @@ public class JdbcTable extends Table {
         return externalTableName;
     }
 
-    public String getRealDatabaseName() {
-        return realDatabaseName;
+    public String getRemoteDatabaseName() {
+        return remoteDatabaseName;
     }
 
-    public String getRealTableName() {
-        return realTableName;
+    public String getRemoteTableName() {
+        return remoteTableName;
     }
 
-    public String getProperRealFullTableName(TOdbcTableType tableType) {
-        if (realDatabaseName == null || realTableName == null) {
+    public String getProperRemoteFullTableName(TOdbcTableType tableType) {
+        if (remoteDatabaseName == null || remoteTableName == null) {
             return databaseProperName(tableType, externalTableName);
         } else {
-            return properNameWithRealName(tableType, realDatabaseName) + "." + 
properNameWithRealName(tableType,
-                    realTableName);
+            return properNameWithRemoteName(tableType, remoteDatabaseName) + 
"." + properNameWithRemoteName(tableType,
+                    remoteTableName);
         }
     }
 
-    public String getProperRealColumnName(TOdbcTableType tableType, String 
columnName) {
-        if (realColumnNames == null || realColumnNames.isEmpty() || 
!realColumnNames.containsKey(columnName)) {
+    public String getProperRemoteColumnName(TOdbcTableType tableType, String 
columnName) {
+        if (remoteColumnNames == null || remoteColumnNames.isEmpty() || 
!remoteColumnNames.containsKey(columnName)) {
             return databaseProperName(tableType, columnName);
         } else {
-            return properNameWithRealName(tableType, 
realColumnNames.get(columnName));
+            return properNameWithRemoteName(tableType, 
remoteColumnNames.get(columnName));
         }
     }
 
@@ -496,13 +496,13 @@ public class JdbcTable extends Table {
         }
     }
 
-    public static String properNameWithRealName(TOdbcTableType tableType, 
String name) {
+    public static String properNameWithRemoteName(TOdbcTableType tableType, 
String remoteName) {
         switch (tableType) {
             case MYSQL:
             case OCEANBASE:
-                return formatNameWithRealName(name, "`", "`");
+                return formatNameWithRemoteName(remoteName, "`", "`");
             case SQLSERVER:
-                return formatNameWithRealName(name, "[", "]");
+                return formatNameWithRemoteName(remoteName, "[", "]");
             case POSTGRESQL:
             case CLICKHOUSE:
             case TRINO:
@@ -510,13 +510,13 @@ public class JdbcTable extends Table {
             case OCEANBASE_ORACLE:
             case ORACLE:
             case SAP_HANA:
-                return formatNameWithRealName(name, "\"", "\"");
+                return formatNameWithRemoteName(remoteName, "\"", "\"");
             default:
-                return name;
+                return remoteName;
         }
     }
 
-    public static String formatNameWithRealName(String name, String wrapStart, 
String wrapEnd) {
-        return wrapStart + name + wrapEnd;
+    public static String formatNameWithRemoteName(String remoteName, String 
wrapStart, String wrapEnd) {
+        return wrapStart + remoteName + wrapEnd;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index 1a51d42f57a..88d3786e136 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -48,6 +48,8 @@ public abstract class Resource implements Writable, 
GsonPostProcessable {
     public static final String REFERENCE_SPLIT = "@";
     public static final String INCLUDE_DATABASE_LIST = "include_database_list";
     public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
+    public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
+    public static final String META_NAMES_MAPPING = "meta_names_mapping";
 
     public enum ResourceType {
         UNKNOWN,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
index b0a0654e908..8ef7e411c2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java
@@ -84,10 +84,11 @@ public class JdbcExternalTable extends ExternalTable {
         JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, 
TableType.JDBC_EXTERNAL_TABLE);
         jdbcTable.setCatalogId(jdbcCatalog.getId());
         jdbcTable.setExternalTableName(fullDbName);
-        jdbcTable.setRealDatabaseName(((JdbcExternalCatalog) 
catalog).getJdbcClient().getRealDatabaseName(this.dbName));
-        jdbcTable.setRealTableName(
-                ((JdbcExternalCatalog) 
catalog).getJdbcClient().getRealTableName(this.dbName, this.name));
-        jdbcTable.setRealColumnNames(((JdbcExternalCatalog) 
catalog).getJdbcClient().getRealColumnNames(this.dbName,
+        jdbcTable.setRemoteDatabaseName(
+                ((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteDatabaseName(this.dbName));
+        jdbcTable.setRemoteTableName(
+                ((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name));
+        jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog) 
catalog).getJdbcClient().getRemoteColumnNames(this.dbName,
                 this.name));
         jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
         jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 09ad69ec8b6..3bf674a8abd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -119,7 +119,7 @@ public class CatalogFactory {
                 catalog = new EsExternalCatalog(catalogId, name, resource, 
props, comment);
                 break;
             case "jdbc":
-                catalog = new JdbcExternalCatalog(catalogId, name, resource, 
props, comment);
+                catalog = new JdbcExternalCatalog(catalogId, name, resource, 
props, comment, isReplay);
                 break;
             case "iceberg":
                 catalog = 
IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, 
comment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index e99174c1c20..59c356f0df0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -26,7 +26,6 @@ import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
-import org.apache.doris.qe.GlobalVariable;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -34,12 +33,16 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 
 @Getter
 public class JdbcExternalCatalog extends ExternalCatalog {
+    private static final Logger LOG = 
LogManager.getLogger(JdbcExternalCatalog.class);
+
     private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
             JdbcResource.JDBC_URL,
             JdbcResource.DRIVER_URL,
@@ -51,10 +54,10 @@ public class JdbcExternalCatalog extends ExternalCatalog {
     private transient JdbcClient jdbcClient;
 
     public JdbcExternalCatalog(long catalogId, String name, String resource, 
Map<String, String> props,
-            String comment)
+            String comment, boolean isReplay)
             throws DdlException {
         super(catalogId, name, InitCatalogLog.Type.JDBC, comment);
-        this.catalogProperty = new CatalogProperty(resource, 
processCompatibleProperties(props));
+        this.catalogProperty = new CatalogProperty(resource, 
processCompatibleProperties(props, isReplay));
     }
 
     @Override
@@ -71,7 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         JdbcResource.validateProperties(propertiesWithoutCheckSum);
 
         
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, 
getOnlySpecifiedDatabase());
-        JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, 
getLowerCaseTableNames());
+        JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, 
getLowerCaseMetaNames());
         JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), 
getIncludeDatabaseMap(),
                 getExcludeDatabaseMap());
         JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), 
getConnectionPoolMaxSize(),
@@ -94,11 +97,24 @@ public class JdbcExternalCatalog extends ExternalCatalog {
         }
     }
 
-    private Map<String, String> processCompatibleProperties(Map<String, 
String> props) throws DdlException {
+    protected Map<String, String> processCompatibleProperties(Map<String, 
String> props, boolean isReplay)
+            throws DdlException {
         Map<String, String> properties = Maps.newHashMap();
         for (Map.Entry<String, String> kv : props.entrySet()) {
             properties.put(StringUtils.removeStart(kv.getKey(), 
JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue());
         }
+
+        // Modify lower_case_table_names to lower_case_meta_names if it exists
+        if (properties.containsKey("lower_case_table_names") && isReplay) {
+            String lowerCaseTableNamesValue = 
properties.get("lower_case_table_names");
+            properties.put("lower_case_meta_names", lowerCaseTableNamesValue);
+            properties.remove("lower_case_table_names");
+            LOG.info("Modify lower_case_table_names to lower_case_meta_names, 
value: {}", lowerCaseTableNamesValue);
+        } else if (properties.containsKey("lower_case_table_names") && 
!isReplay) {
+            throw new DdlException("Jdbc catalog property 
lower_case_table_names is not supported,"
+                    + " please use lower_case_meta_names instead");
+        }
+
         String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
         if (!Strings.isNullOrEmpty(jdbcUrl)) {
             jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
@@ -145,15 +161,14 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 JdbcResource.ONLY_SPECIFIED_DATABASE));
     }
 
-    public String getLowerCaseTableNames() {
-        // Forced to true if Config.lower_case_table_names has a value of 1 or 
2
-        if (GlobalVariable.lowerCaseTableNames == 1 || 
GlobalVariable.lowerCaseTableNames == 2) {
-            return "true";
-        }
+    public String getLowerCaseMetaNames() {
+        return 
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_META_NAMES, 
JdbcResource.getDefaultPropertyValue(
+                JdbcResource.LOWER_CASE_META_NAMES));
+    }
 
-        // Otherwise, it defaults to false
-        return 
catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, 
JdbcResource.getDefaultPropertyValue(
-                JdbcResource.LOWER_CASE_TABLE_NAMES));
+    public String getMetaNamesMapping() {
+        return catalogProperty.getOrDefault(JdbcResource.META_NAMES_MAPPING, 
JdbcResource.getDefaultPropertyValue(
+                JdbcResource.META_NAMES_MAPPING));
     }
 
     public int getConnectionPoolMinSize() {
@@ -191,7 +206,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
                 .setDriverUrl(getDriverUrl())
                 .setDriverClass(getDriverClass())
                 .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
-                .setIsLowerCaseTableNames(getLowerCaseTableNames())
+                .setIsLowerCaseMetaNames(getLowerCaseMetaNames())
+                .setMetaNamesMapping(getMetaNamesMapping())
                 .setIncludeDatabaseMap(getIncludeDatabaseMap())
                 .setExcludeDatabaseMap(getExcludeDatabaseMap())
                 .setConnectionPoolMinSize(getConnectionPoolMinSize())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
new file mode 100644
index 00000000000..82422518e68
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcIdentifierMapping.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.jdbc;
+
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.datasource.mapping.IdentifierMapping;
+
+public class JdbcIdentifierMapping extends IdentifierMapping {
+    private final JdbcClient jdbcClient;
+
+    public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String 
metaNamesMapping, JdbcClient jdbcClient) {
+        super(isLowerCaseMetaNames, metaNamesMapping);
+        this.jdbcClient = jdbcClient;
+    }
+
+    @Override
+    protected void loadDatabaseNames() {
+        jdbcClient.getDatabaseNameList();
+    }
+
+    @Override
+    protected void loadTableNames(String localDbName) {
+        jdbcClient.getTablesNameList(localDbName);
+    }
+
+    @Override
+    protected void loadColumnNames(String localDbName, String localTableName) {
+        jdbcClient.getJdbcColumnsInfo(localDbName, localTableName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 6112398a096..85b24fdba07 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.google.common.collect.Lists;
@@ -43,8 +44,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 @Getter
@@ -53,29 +52,17 @@ public abstract class JdbcClient {
     private static final int HTTP_TIMEOUT_MS = 10000;
     protected static final int JDBC_DATETIME_SCALE = 6;
 
-    private String catalog;
+    private String catalogName;
     protected String dbType;
     protected String jdbcUser;
     protected URLClassLoader classLoader = null;
     protected DruidDataSource dataSource = null;
     protected boolean isOnlySpecifiedDatabase;
-    protected boolean isLowerCaseTableNames;
-
+    protected boolean isLowerCaseMetaNames;
+    protected String metaNamesMapping;
     protected Map<String, Boolean> includeDatabaseMap;
     protected Map<String, Boolean> excludeDatabaseMap;
-    // only used when isLowerCaseTableNames = true.
-    protected final ConcurrentHashMap<String, String> lowerDBToRealDB = new 
ConcurrentHashMap<>();
-    // only used when isLowerCaseTableNames = true.
-    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
String>> lowerTableToRealTable
-            = new ConcurrentHashMap<>();
-    // only used when isLowerCaseTableNames = true.
-    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<String, String>>>
-            lowerColumnToRealColumn = new ConcurrentHashMap<>();
-
-    private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
-    private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap 
= new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicBoolean>> columnNamesLoadedMap
-            = new ConcurrentHashMap<>();
+    protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;
 
     public static JdbcClient createJdbcClient(JdbcClientConfig 
jdbcClientConfig) {
         String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@@ -103,10 +90,11 @@ public abstract class JdbcClient {
     }
 
     protected JdbcClient(JdbcClientConfig jdbcClientConfig) {
-        this.catalog = jdbcClientConfig.getCatalog();
+        this.catalogName = jdbcClientConfig.getCatalog();
         this.jdbcUser = jdbcClientConfig.getUser();
         this.isOnlySpecifiedDatabase = 
Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
-        this.isLowerCaseTableNames = 
Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseTableNames());
+        this.isLowerCaseMetaNames = 
Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
+        this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
         this.includeDatabaseMap =
                 
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
         this.excludeDatabaseMap =
@@ -114,6 +102,7 @@ public abstract class JdbcClient {
         String jdbcUrl = jdbcClientConfig.getJdbcUrl();
         this.dbType = parseDbType(jdbcUrl);
         initializeDataSource(jdbcClientConfig);
+        this.jdbcLowerCaseMetaMatching = new 
JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
     }
 
     // Initialize DruidDataSource
@@ -177,7 +166,7 @@ public abstract class JdbcClient {
             conn = dataSource.getConnection();
         } catch (Exception e) {
             String errorMessage = String.format("Can not connect to jdbc due 
to error: %s, Catalog name: %s", e,
-                    this.getCatalog());
+                    this.getCatalogName());
             throw new JdbcClientException(errorMessage, e);
         }
         return conn;
@@ -195,6 +184,25 @@ public abstract class JdbcClient {
         }
     }
 
+    /**
+     * Execute stmt direct via jdbc
+     *
+     * @param origStmt, the raw stmt string
+     */
+    public void executeStmt(String origStmt) {
+        Connection conn = getConnection();
+        Statement stmt = null;
+        try {
+            stmt = conn.createStatement();
+            int effectedRows = stmt.executeUpdate(origStmt);
+            LOG.debug("finished to execute dml stmt: {}, effected rows: {}", 
origStmt, effectedRows);
+        } catch (SQLException e) {
+            throw new JdbcClientException("Failed to execute stmt. error: " + 
e.getMessage(), e);
+        } finally {
+            close(stmt, conn);
+        }
+    }
+
     // This part used to process meta-information of database, table and 
column.
 
     /**
@@ -209,85 +217,54 @@ public abstract class JdbcClient {
         if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && 
excludeDatabaseMap.isEmpty()) {
             return getSpecifiedDatabase(conn);
         }
-        List<String> databaseNames = Lists.newArrayList();
+        List<String> remoteDatabaseNames = Lists.newArrayList();
         try {
             stmt = conn.createStatement();
             String sql = getDatabaseQuery();
             rs = stmt.executeQuery(sql);
-            List<String> tempDatabaseNames = Lists.newArrayList();
             while (rs.next()) {
-                String databaseName = rs.getString(1);
-                if (isLowerCaseTableNames) {
-                    lowerDBToRealDB.put(databaseName.toLowerCase(), 
databaseName);
-                    databaseName = databaseName.toLowerCase();
-                } else {
-                    lowerDBToRealDB.put(databaseName, databaseName);
-                }
-                tempDatabaseNames.add(databaseName);
-            }
-            if (isOnlySpecifiedDatabase) {
-                for (String db : tempDatabaseNames) {
-                    // Exclude database map take effect with higher priority 
over include database map
-                    if (!excludeDatabaseMap.isEmpty() && 
excludeDatabaseMap.containsKey(db)) {
-                        continue;
-                    }
-                    if (!includeDatabaseMap.isEmpty() && 
!includeDatabaseMap.containsKey(db)) {
-                        continue;
-                    }
-                    databaseNames.add(db);
-                }
-            } else {
-                databaseNames = tempDatabaseNames;
+                remoteDatabaseNames.add(rs.getString(1));
             }
         } catch (SQLException e) {
             throw new JdbcClientException("failed to get database name list 
from jdbc", e);
         } finally {
             close(rs, stmt, conn);
         }
-        return databaseNames;
+        return filterDatabaseNames(remoteDatabaseNames);
     }
 
     /**
      * get all tables of one database
      */
-    public List<String> getTablesNameList(String dbName) {
-        List<String> tablesName = Lists.newArrayList();
+    public List<String> getTablesNameList(String localDbName) {
+        List<String> remoteTablesNames = Lists.newArrayList();
         String[] tableTypes = getTableTypes();
-        String finalDbName = getRealDatabaseName(dbName);
-        processTable(finalDbName, null, tableTypes, (rs) -> {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        processTable(remoteDbName, null, tableTypes, (rs) -> {
             try {
                 while (rs.next()) {
-                    String tableName = rs.getString("TABLE_NAME");
-                    if (isLowerCaseTableNames) {
-                        lowerTableToRealTable.putIfAbsent(finalDbName, new 
ConcurrentHashMap<>());
-                        
lowerTableToRealTable.get(finalDbName).put(tableName.toLowerCase(), tableName);
-                        tableName = tableName.toLowerCase();
-                    } else {
-                        lowerTableToRealTable.putIfAbsent(finalDbName, new 
ConcurrentHashMap<>());
-                        lowerTableToRealTable.get(finalDbName).put(tableName, 
tableName);
-                    }
-                    tablesName.add(tableName);
+                    remoteTablesNames.add(rs.getString("TABLE_NAME"));
                 }
             } catch (SQLException e) {
-                throw new JdbcClientException("failed to get all tables for db 
%s", e, finalDbName);
+                throw new JdbcClientException("failed to get all tables for 
remote database %s", e, remoteDbName);
             }
         });
-        return tablesName;
+        return filterTableNames(remoteDbName, remoteTablesNames);
     }
 
-    public boolean isTableExist(String dbName, String tableName) {
+    public boolean isTableExist(String localDbName, String localTableName) {
         final boolean[] isExist = {false};
         String[] tableTypes = getTableTypes();
-        String finalDbName = getRealDatabaseName(dbName);
-        String finalTableName = getRealTableName(dbName, tableName);
-        processTable(finalDbName, finalTableName, tableTypes, (rs) -> {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
+        processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
             try {
                 if (rs.next()) {
                     isExist[0] = true;
                 }
             } catch (SQLException e) {
                 throw new JdbcClientException("failed to judge if table exist 
for table %s in db %s",
-                        e, finalTableName, finalDbName);
+                        e, remoteTableName, remoteDbName);
             }
         });
         return isExist[0];
@@ -296,29 +273,19 @@ public abstract class JdbcClient {
     /**
      * get all columns of one table
      */
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String 
tableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
         List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
-        String finalDbName = getRealDatabaseName(dbName);
-        String finalTableName = getRealTableName(dbName, tableName);
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
-            rs = getColumns(databaseMetaData, catalogName, finalDbName, 
finalTableName);
+            rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, 
remoteTableName);
             while (rs.next()) {
-                lowerColumnToRealColumn.putIfAbsent(finalDbName, new 
ConcurrentHashMap<>());
-                
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new 
ConcurrentHashMap<>());
                 JdbcFieldSchema field = new JdbcFieldSchema();
-                String columnName = rs.getString("COLUMN_NAME");
-                if (isLowerCaseTableNames) {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
-                            .put(columnName.toLowerCase(), columnName);
-                    columnName = columnName.toLowerCase();
-                } else {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, 
columnName);
-                }
-                field.setColumnName(columnName);
+                field.setColumnName(rs.getString("COLUMN_NAME"));
                 field.setDataType(rs.getInt("DATA_TYPE"));
                 field.setDataTypeName(rs.getString("TYPE_NAME"));
                 /*
@@ -340,16 +307,16 @@ public abstract class JdbcClient {
                 tableSchema.add(field);
             }
         } catch (SQLException e) {
-            throw new JdbcClientException("failed to get table name list from 
jdbc for table %s:%s", e, finalTableName,
-                    Util.getRootCauseMessage(e));
+            throw new JdbcClientException("failed to get jdbc columns info for 
table %.%s: %s",
+                    e, localDbName, localTableName, 
Util.getRootCauseMessage(e));
         } finally {
             close(rs, conn);
         }
         return tableSchema;
     }
 
-    public List<Column> getColumnsFromJdbc(String dbName, String tableName) {
-        List<JdbcFieldSchema> jdbcTableSchema = getJdbcColumnsInfo(dbName, 
tableName);
+    public List<Column> getColumnsFromJdbc(String localDbName, String 
localTableName) {
+        List<JdbcFieldSchema> jdbcTableSchema = 
getJdbcColumnsInfo(localDbName, localTableName);
         List<Column> dorisTableSchema = 
Lists.newArrayListWithCapacity(jdbcTableSchema.size());
         for (JdbcFieldSchema field : jdbcTableSchema) {
             dorisTableSchema.add(new Column(field.getColumnName(),
@@ -357,71 +324,21 @@ public abstract class JdbcClient {
                     field.isAllowNull(), field.getRemarks(),
                     true, -1));
         }
-        return dorisTableSchema;
-    }
-
-    public String getRealDatabaseName(String dbname) {
-        if (lowerDBToRealDB == null
-                || lowerDBToRealDB.isEmpty()
-                || !lowerDBToRealDB.containsKey(dbname)) {
-            loadDatabaseNamesIfNeeded();
-        }
-
-        return lowerDBToRealDB.get(dbname);
-    }
-
-    public String getRealTableName(String dbName, String tableName) {
-        String realDbName = getRealDatabaseName(dbName);
-        if (lowerTableToRealTable == null
-                || lowerTableToRealTable.isEmpty()
-                || !lowerTableToRealTable.containsKey(realDbName)
-                || lowerTableToRealTable.get(realDbName) == null
-                || lowerTableToRealTable.get(realDbName).isEmpty()
-                || 
!lowerTableToRealTable.get(realDbName).containsKey(tableName)
-                || lowerTableToRealTable.get(realDbName).get(tableName) == 
null) {
-            loadTableNamesIfNeeded(dbName);
-        }
-
-        return lowerTableToRealTable.get(realDbName).get(tableName);
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
+        return filterColumnName(remoteDbName, remoteTableName, 
dorisTableSchema);
     }
 
-    public Map<String, String> getRealColumnNames(String dbName, String 
tableName) {
-        String realDbName = getRealDatabaseName(dbName);
-        String realTableName = getRealTableName(dbName, tableName);
-        if (lowerColumnToRealColumn == null
-                || lowerColumnToRealColumn.isEmpty()
-                || !lowerColumnToRealColumn.containsKey(realDbName)
-                || lowerColumnToRealColumn.get(realDbName) == null
-                || lowerColumnToRealColumn.get(realDbName).isEmpty()
-                || 
!lowerColumnToRealColumn.get(realDbName).containsKey(realTableName)
-                || lowerColumnToRealColumn.get(realDbName).get(realTableName) 
== null
-                || 
lowerColumnToRealColumn.get(realDbName).get(realTableName).isEmpty()) {
-            loadColumnNamesIfNeeded(dbName, tableName);
-        }
-        return lowerColumnToRealColumn.get(realDbName).get(realTableName);
+    public String getRemoteDatabaseName(String localDbname) {
+        return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
     }
 
-    private void loadDatabaseNamesIfNeeded() {
-        if (dbNamesLoaded.compareAndSet(false, true)) {
-            getDatabaseNameList();
-        }
+    public String getRemoteTableName(String localDbName, String 
localTableName) {
+        return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, 
localTableName);
     }
 
-    private void loadTableNamesIfNeeded(String dbName) {
-        AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(dbName, k 
-> new AtomicBoolean(false));
-        if (isLoaded.compareAndSet(false, true)) {
-            getTablesNameList(dbName);
-        }
-    }
-
-    private void loadColumnNamesIfNeeded(String dbName, String tableName) {
-        ConcurrentHashMap<String, AtomicBoolean> tableMap = 
columnNamesLoadedMap.computeIfAbsent(dbName,
-                k -> new ConcurrentHashMap<>());
-        AtomicBoolean isLoaded = tableMap.computeIfAbsent(tableName, k -> new 
AtomicBoolean(false));
-
-        if (isLoaded.compareAndSet(false, true)) {
-            getJdbcColumnsInfo(dbName, tableName);
-        }
+    public Map<String, String> getRemoteColumnNames(String localDbName, String 
localTableName) {
+        return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, 
localTableName);
     }
 
     // protected methods,for subclass to override
@@ -447,14 +364,14 @@ public abstract class JdbcClient {
         return new String[] {"TABLE", "VIEW"};
     }
 
-    protected void processTable(String dbName, String tableName, String[] 
tableTypes,
+    protected void processTable(String remoteDbName, String remoteTableName, 
String[] tableTypes,
             Consumer<ResultSet> resultSetConsumer) {
         Connection conn = getConnection();
         ResultSet rs = null;
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
-            rs = databaseMetaData.getTables(catalogName, dbName, tableName, 
tableTypes);
+            rs = databaseMetaData.getTables(catalogName, remoteDbName, 
remoteTableName, tableTypes);
             resultSetConsumer.accept(rs);
         } catch (SQLException e) {
             throw new JdbcClientException("Failed to process table", e);
@@ -463,36 +380,41 @@ public abstract class JdbcClient {
         }
     }
 
-    protected String modifyTableNameIfNecessary(String tableName) {
-        return tableName;
+    protected String modifyTableNameIfNecessary(String remoteTableName) {
+        return remoteTableName;
     }
 
     protected boolean isTableModified(String modifiedTableName, String 
actualTableName) {
         return false;
     }
 
-    protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String 
catalogName, String schemaName,
-            String tableName) throws SQLException {
-        return databaseMetaData.getColumns(catalogName, schemaName, tableName, 
null);
+    protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, 
String catalogName, String remoteDbName,
+            String remoteTableName) throws SQLException {
+        return databaseMetaData.getColumns(catalogName, remoteDbName, 
remoteTableName, null);
     }
 
-    /**
-     * Execute stmt direct via jdbc
-     *
-     * @param origStmt, the raw stmt string
-     */
-    public void executeStmt(String origStmt) {
-        Connection conn = getConnection();
-        Statement stmt = null;
-        try {
-            stmt = conn.createStatement();
-            int effectedRows = stmt.executeUpdate(origStmt);
-            LOG.debug("finished to execute dml stmt: {}, effected rows: {}", 
origStmt, effectedRows);
-        } catch (SQLException e) {
-            throw new JdbcClientException("Failed to execute stmt. error: " + 
e.getMessage(), e);
-        } finally {
-            close(stmt, conn);
+    protected List<String> filterDatabaseNames(List<String> remoteDbNames) {
+        List<String> filteredDatabaseNames = Lists.newArrayList();
+        for (String databaseName : remoteDbNames) {
+            if (isOnlySpecifiedDatabase) {
+                if (!excludeDatabaseMap.isEmpty() && 
excludeDatabaseMap.containsKey(databaseName)) {
+                    continue;
+                }
+                if (!includeDatabaseMap.isEmpty() && 
!includeDatabaseMap.containsKey(databaseName)) {
+                    continue;
+                }
+            }
+            filteredDatabaseNames.add(databaseName);
         }
+        return 
jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
+    }
+
+    protected List<String> filterTableNames(String remoteDbName, List<String> 
localTableNames) {
+        return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, 
localTableNames);
+    }
+
+    protected List<Column> filterColumnName(String remoteDbName, String 
remoteTableName, List<Column> remoteColumns) {
+        return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, 
remoteTableName, remoteColumns);
     }
 
     @Data
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
index 41fac872e46..85f3bd8f256 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java
@@ -32,7 +32,8 @@ public class JdbcClientConfig implements Cloneable {
     private String driverUrl;
     private String driverClass;
     private String onlySpecifiedDatabase;
-    private String isLowerCaseTableNames;
+    private String isLowerCaseMetaNames;
+    private String metaNamesMapping;
     private int connectionPoolMinSize;
     private int connectionPoolMaxSize;
     private int connectionPoolMaxWaitTime;
@@ -45,7 +46,8 @@ public class JdbcClientConfig implements Cloneable {
 
     public JdbcClientConfig() {
         this.onlySpecifiedDatabase = 
JdbcResource.getDefaultPropertyValue(JdbcResource.ONLY_SPECIFIED_DATABASE);
-        this.isLowerCaseTableNames = 
JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_TABLE_NAMES);
+        this.isLowerCaseMetaNames = 
JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_META_NAMES);
+        this.metaNamesMapping = 
JdbcResource.getDefaultPropertyValue(JdbcResource.META_NAMES_MAPPING);
         this.connectionPoolMinSize = Integer.parseInt(
                 
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE));
         this.connectionPoolMaxSize = Integer.parseInt(
@@ -143,12 +145,21 @@ public class JdbcClientConfig implements Cloneable {
         return this;
     }
 
-    public String getIsLowerCaseTableNames() {
-        return isLowerCaseTableNames;
+    public String getIsLowerCaseMetaNames() {
+        return isLowerCaseMetaNames;
     }
 
-    public JdbcClientConfig setIsLowerCaseTableNames(String 
isLowerCaseTableNames) {
-        this.isLowerCaseTableNames = isLowerCaseTableNames;
+    public JdbcClientConfig setIsLowerCaseMetaNames(String 
isLowerCaseTableNames) {
+        this.isLowerCaseMetaNames = isLowerCaseTableNames;
+        return this;
+    }
+
+    public String getMetaNamesMapping() {
+        return metaNamesMapping;
+    }
+
+    public JdbcClientConfig setMetaNamesMapping(String metaNamesMapping) {
+        this.metaNamesMapping = metaNamesMapping;
         return this;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
index ace29c4198f..854911e7d56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcMySQLClient.java
@@ -33,7 +33,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 public class JdbcMySQLClient extends JdbcClient {
@@ -81,14 +80,14 @@ public class JdbcMySQLClient extends JdbcClient {
     }
 
     @Override
-    protected void processTable(String dbName, String tableName, String[] 
tableTypes,
-                                Consumer<ResultSet> resultSetConsumer) {
+    protected void processTable(String remoteDbName, String remoteTableName, 
String[] tableTypes,
+            Consumer<ResultSet> resultSetConsumer) {
         Connection conn = null;
         ResultSet rs = null;
         try {
             conn = super.getConnection();
             DatabaseMetaData databaseMetaData = conn.getMetaData();
-            rs = databaseMetaData.getTables(dbName, null, tableName, 
tableTypes);
+            rs = databaseMetaData.getTables(remoteDbName, null, 
remoteTableName, tableTypes);
             resultSetConsumer.accept(rs);
         } catch (SQLException e) {
             throw new JdbcClientException("Failed to process table", e);
@@ -103,39 +102,29 @@ public class JdbcMySQLClient extends JdbcClient {
     }
 
     @Override
-    protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String 
catalogName, String schemaName,
-                                   String tableName) throws SQLException {
-        return databaseMetaData.getColumns(schemaName, null, tableName, null);
+    protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, 
String catalogName, String remoteDbName,
+            String remoteTableName) throws SQLException {
+        return databaseMetaData.getColumns(remoteDbName, null, 
remoteTableName, null);
     }
 
     /**
      * get all columns of one table
      */
     @Override
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String 
tableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
-        List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
-        String finalDbName = getRealDatabaseName(dbName);
-        String finalTableName = getRealTableName(dbName, tableName);
+        List<JdbcFieldSchema> tableSchema = 
com.google.common.collect.Lists.newArrayList();
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
-            rs = getColumns(databaseMetaData, catalogName, finalDbName, 
finalTableName);
+            rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, 
remoteTableName);
             Map<String, String> mapFieldtoType = null;
             while (rs.next()) {
-                lowerColumnToRealColumn.putIfAbsent(finalDbName, new 
ConcurrentHashMap<>());
-                
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new 
ConcurrentHashMap<>());
                 JdbcFieldSchema field = new JdbcFieldSchema();
-                String columnName = rs.getString("COLUMN_NAME");
-                if (isLowerCaseTableNames) {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
-                            .put(columnName.toLowerCase(), columnName);
-                    columnName = columnName.toLowerCase();
-                } else {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, 
columnName);
-                }
-                field.setColumnName(columnName);
+                field.setColumnName(rs.getString("COLUMN_NAME"));
                 field.setDataType(rs.getInt("DATA_TYPE"));
 
                 // in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column 
in doris will be "UNKNOWN"
@@ -144,7 +133,7 @@ public class JdbcMySQLClient extends JdbcClient {
                 // in mysql-jdbc-connector-5.1.*, TYPE_NAME of BITMAP column 
in doris will be "BITMAP"
                 field.setDataTypeName(rs.getString("TYPE_NAME"));
                 if (isDoris) {
-                    mapFieldtoType = getColumnsDataTypeUseQuery(dbName, 
tableName);
+                    mapFieldtoType = getColumnsDataTypeUseQuery(localDbName, 
localTableName);
                     
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
                 }
                 field.setColumnSize(rs.getInt("COLUMN_SIZE"));
@@ -163,7 +152,7 @@ public class JdbcMySQLClient extends JdbcClient {
             }
         } catch (SQLException e) {
             throw new JdbcClientException("failed to get jdbc columns info for 
table %.%s: %s",
-                    e, dbName, tableName, Util.getRootCauseMessage(e));
+                    e, localDbName, localTableName, 
Util.getRootCauseMessage(e));
         } finally {
             close(rs, conn);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
index a5d2929605a..514e7b81638 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
@@ -28,7 +28,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class JdbcOracleClient extends JdbcClient {
 
@@ -56,78 +55,46 @@ public class JdbcOracleClient extends JdbcClient {
         List<String> databaseNames = Lists.newArrayList();
         try {
             rs = conn.getMetaData().getSchemas(conn.getCatalog(), null);
-            List<String> tempDatabaseNames = Lists.newArrayList();
             while (rs.next()) {
-                String databaseName = rs.getString("TABLE_SCHEM");
-                if (isLowerCaseTableNames) {
-                    lowerDBToRealDB.put(databaseName.toLowerCase(), 
databaseName);
-                    databaseName = databaseName.toLowerCase();
-                } else {
-                    lowerDBToRealDB.put(databaseName, databaseName);
-                }
-                tempDatabaseNames.add(databaseName);
-            }
-            if (isOnlySpecifiedDatabase) {
-                for (String db : tempDatabaseNames) {
-                    // Exclude database map take effect with higher priority 
over include database map
-                    if (!excludeDatabaseMap.isEmpty() && 
excludeDatabaseMap.containsKey(db)) {
-                        continue;
-                    }
-                    if (!includeDatabaseMap.isEmpty() && 
!includeDatabaseMap.containsKey(db)) {
-                        continue;
-                    }
-                    databaseNames.add(db);
-                }
-            } else {
-                databaseNames = tempDatabaseNames;
+                databaseNames.add(rs.getString("TABLE_SCHEM"));
             }
         } catch (SQLException e) {
             throw new JdbcClientException("failed to get database name list 
from jdbc", e);
         } finally {
             close(rs, conn);
         }
-        return databaseNames;
+        return filterDatabaseNames(databaseNames);
     }
 
     @Override
-    public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String 
tableName) {
+    public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String 
localTableName) {
         Connection conn = getConnection();
         ResultSet rs = null;
         List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
-        String finalDbName = getRealDatabaseName(dbName);
-        String finalTableName = getRealTableName(dbName, tableName);
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
         try {
             DatabaseMetaData databaseMetaData = conn.getMetaData();
             String catalogName = getCatalogName(conn);
             String modifiedTableName;
             boolean isModify = false;
-            if (finalTableName.contains("/")) {
-                modifiedTableName = modifyTableNameIfNecessary(finalTableName);
-                isModify = !modifiedTableName.equals(finalTableName);
+            if (remoteTableName.contains("/")) {
+                modifiedTableName = 
modifyTableNameIfNecessary(remoteTableName);
+                isModify = !modifiedTableName.equals(remoteTableName);
                 if (isModify) {
-                    rs = getColumns(databaseMetaData, catalogName, 
finalDbName, modifiedTableName);
+                    rs = getRemoteColumns(databaseMetaData, catalogName, 
remoteDbName, modifiedTableName);
                 } else {
-                    rs = getColumns(databaseMetaData, catalogName, 
finalDbName, finalTableName);
+                    rs = getRemoteColumns(databaseMetaData, catalogName, 
remoteDbName, remoteTableName);
                 }
             } else {
-                rs = getColumns(databaseMetaData, catalogName, finalDbName, 
finalTableName);
+                rs = getRemoteColumns(databaseMetaData, catalogName, 
remoteDbName, remoteTableName);
             }
             while (rs.next()) {
-                if (isModify && isTableModified(rs.getString("TABLE_NAME"), 
finalTableName)) {
+                if (isModify && isTableModified(rs.getString("TABLE_NAME"), 
remoteTableName)) {
                     continue;
                 }
-                lowerColumnToRealColumn.putIfAbsent(finalDbName, new 
ConcurrentHashMap<>());
-                
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new 
ConcurrentHashMap<>());
                 JdbcFieldSchema field = new JdbcFieldSchema();
-                String columnName = rs.getString("COLUMN_NAME");
-                if (isLowerCaseTableNames) {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
-                            .put(columnName.toLowerCase(), columnName);
-                    columnName = columnName.toLowerCase();
-                } else {
-                    
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, 
columnName);
-                }
-                field.setColumnName(columnName);
+                field.setColumnName(rs.getString("COLUMN_NAME"));
                 field.setDataType(rs.getInt("DATA_TYPE"));
                 field.setDataTypeName(rs.getString("TYPE_NAME"));
                 /*
@@ -149,7 +116,7 @@ public class JdbcOracleClient extends JdbcClient {
                 tableSchema.add(field);
             }
         } catch (SQLException e) {
-            throw new JdbcClientException("failed to get table name list from 
jdbc for table %s:%s", e, finalTableName,
+            throw new JdbcClientException("failed to get table name list from 
jdbc for table %s:%s", e, remoteTableName,
                 Util.getRootCauseMessage(e));
         } finally {
             close(rs, conn);
@@ -158,8 +125,8 @@ public class JdbcOracleClient extends JdbcClient {
     }
 
     @Override
-    protected String modifyTableNameIfNecessary(String tableName) {
-        return tableName.replace("/", "%");
+    protected String modifyTableNameIfNecessary(String remoteTableName) {
+        return remoteTableName.replace("/", "%");
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
new file mode 100644
index 00000000000..c42f5fca28f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.mapping;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.qe.GlobalVariable;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class IdentifierMapping {
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final ConcurrentHashMap<String, String> localDBToRemoteDB = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
localTableToRemoteTable
+            = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
ConcurrentHashMap<String, String>>>
+            localColumnToRemoteColumn = new ConcurrentHashMap<>();
+
+    private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
+    private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap 
= new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
AtomicBoolean>> columnNamesLoadedMap
+            = new ConcurrentHashMap<>();
+
+    private final boolean isLowerCaseMetaNames;
+    private final String metaNamesMapping;
+
+    public IdentifierMapping(boolean isLowerCaseMetaNames, String 
metaNamesMapping) {
+        this.isLowerCaseMetaNames = isLowerCaseMetaNames;
+        this.metaNamesMapping = metaNamesMapping;
+    }
+
+    public List<String> setDatabaseNameMapping(List<String> 
remoteDatabaseNames) {
+        JsonNode databasesNode = readAndParseJson(metaNamesMapping, 
"databases");
+
+        Map<String, String> databaseNameMapping = Maps.newTreeMap();
+        if (databasesNode.isArray()) {
+            for (JsonNode node : databasesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                String mapping = node.path("mapping").asText();
+                databaseNameMapping.put(remoteDatabase, mapping);
+            }
+        }
+
+        Map<String, List<String>> result = 
nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
+                databaseNameMapping, isLowerCaseMetaNames);
+        List<String> localDatabaseNames = result.get("localNames");
+        List<String> conflictNames = result.get("conflictNames");
+        if (!conflictNames.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflict database/schema names found when 
lower_case_meta_names is true: " + conflictNames
+                            + ". Please set lower_case_meta_names to false or"
+                            + " use meta_name_mapping to specify the names.");
+        }
+        return localDatabaseNames;
+    }
+
+    public List<String> setTableNameMapping(String remoteDbName, List<String> 
remoteTableNames) {
+        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
+
+        Map<String, String> tableNameMapping = Maps.newTreeMap();
+        if (tablesNode.isArray()) {
+            for (JsonNode node : tablesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                if (remoteDbName.equals(remoteDatabase)) {
+                    String remoteTable = node.path("remoteTable").asText();
+                    String mapping = node.path("mapping").asText();
+                    tableNameMapping.put(remoteTable, mapping);
+                }
+            }
+        }
+
+        localTableToRemoteTable.putIfAbsent(remoteDbName, new 
ConcurrentHashMap<>());
+
+        List<String> localTableNames;
+        List<String> conflictNames;
+
+        if (GlobalVariable.lowerCaseTableNames == 1) {
+            Map<String, List<String>> result = 
nameListToMapping(remoteTableNames,
+                    localTableToRemoteTable.get(remoteDbName),
+                    tableNameMapping, true);
+            localTableNames = result.get("localNames");
+            conflictNames = result.get("conflictNames");
+            if (!conflictNames.isEmpty()) {
+                throw new RuntimeException(
+                        "Conflict table names found in remote database/schema: 
" + remoteDbName
+                                + " when lower_case_table_names is 1: " + 
conflictNames
+                                + ". Please use meta_name_mapping to specify 
the names.");
+            }
+        } else {
+            Map<String, List<String>> result = 
nameListToMapping(remoteTableNames,
+                    localTableToRemoteTable.get(remoteDbName),
+                    tableNameMapping, isLowerCaseMetaNames);
+            localTableNames = result.get("localNames");
+            conflictNames = result.get("conflictNames");
+
+            if (!conflictNames.isEmpty()) {
+                throw new RuntimeException(
+                        "Conflict table names found in remote database/schema: 
" + remoteDbName
+                                + "when lower_case_meta_names is true: " + 
conflictNames
+                                + ". Please set lower_case_meta_names to false 
or"
+                                + " use meta_name_mapping to specify the table 
names.");
+            }
+        }
+        return localTableNames;
+    }
+
+    public List<Column> setColumnNameMapping(String remoteDbName, String 
remoteTableName, List<Column> remoteColumns) {
+        JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
+
+        Map<String, String> columnNameMapping = Maps.newTreeMap();
+        if (tablesNode.isArray()) {
+            for (JsonNode node : tablesNode) {
+                String remoteDatabase = node.path("remoteDatabase").asText();
+                String remoteTable = node.path("remoteTable").asText();
+                if (remoteDbName.equals(remoteDatabase) && 
remoteTable.equals(remoteTableName)) {
+                    String remoteColumn = node.path("remoteColumn").asText();
+                    String mapping = node.path("mapping").asText();
+                    columnNameMapping.put(remoteColumn, mapping);
+                }
+            }
+        }
+        localColumnToRemoteColumn.putIfAbsent(remoteDbName, new 
ConcurrentHashMap<>());
+        
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new 
ConcurrentHashMap<>());
+
+        List<String> localColumnNames;
+        List<String> conflictNames;
+
+        // Get the name from localColumns and save it to List<String>
+        List<String> remoteColumnNames = Lists.newArrayList();
+        for (Column remoteColumn : remoteColumns) {
+            remoteColumnNames.add(remoteColumn.getName());
+        }
+
+        Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
+                
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
+                columnNameMapping, isLowerCaseMetaNames);
+        localColumnNames = result.get("localNames");
+        conflictNames = result.get("conflictNames");
+        if (!conflictNames.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflict column names found in remote database/schema: " 
+ remoteDbName
+                            + " in remote table: " + remoteTableName
+                            + " when lower_case_meta_names is true: " + 
conflictNames
+                            + ". Please set lower_case_meta_names to false or"
+                            + " use meta_name_mapping to specify the column 
names.");
+        }
+        // Replace the name in remoteColumns with localColumnNames
+        for (int i = 0; i < remoteColumns.size(); i++) {
+            remoteColumns.get(i).setName(localColumnNames.get(i));
+        }
+        return remoteColumns;
+    }
+
+    public String getRemoteDatabaseName(String localDbName) {
+        if (localDBToRemoteDB.isEmpty() || 
!localDBToRemoteDB.containsKey(localDbName)) {
+            loadDatabaseNamesIfNeeded();
+        }
+        return localDBToRemoteDB.get(localDbName);
+    }
+
+    public String getRemoteTableName(String localDbName, String 
localTableName) {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        if (localTableToRemoteTable.isEmpty()
+                || !localTableToRemoteTable.containsKey(remoteDbName)
+                || localTableToRemoteTable.get(remoteDbName) == null
+                || localTableToRemoteTable.get(remoteDbName).isEmpty()
+                || 
!localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
+                || 
localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
+            loadTableNamesIfNeeded(localDbName);
+        }
+
+        return localTableToRemoteTable.get(remoteDbName).get(localTableName);
+    }
+
+    public Map<String, String> getRemoteColumnNames(String localDbName, String 
localTableName) {
+        String remoteDbName = getRemoteDatabaseName(localDbName);
+        String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
+        if (localColumnToRemoteColumn.isEmpty()
+                || !localColumnToRemoteColumn.containsKey(remoteDbName)
+                || localColumnToRemoteColumn.get(remoteDbName) == null
+                || localColumnToRemoteColumn.get(remoteDbName).isEmpty()
+                || 
!localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
+                || 
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
+                || localColumnToRemoteColumn.get(
+                remoteDbName).get(remoteTableName).isEmpty()) {
+            loadColumnNamesIfNeeded(localDbName, localTableName);
+        }
+        return 
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
+    }
+
+    private void loadDatabaseNamesIfNeeded() {
+        if (dbNamesLoaded.compareAndSet(false, true)) {
+            loadDatabaseNames();
+        }
+    }
+
+    private void loadTableNamesIfNeeded(String localDbName) {
+        AtomicBoolean isLoaded = 
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
+        if (isLoaded.compareAndSet(false, true)) {
+            loadTableNames(localDbName);
+        }
+    }
+
+    private void loadColumnNamesIfNeeded(String localDbName, String 
localTableName) {
+        columnNamesLoadedMap.putIfAbsent(localDbName, new 
ConcurrentHashMap<>());
+        AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
+                .computeIfAbsent(localTableName, k -> new 
AtomicBoolean(false));
+        if (isLoaded.compareAndSet(false, true)) {
+            loadColumnNames(localDbName, localTableName);
+        }
+    }
+
+    // Load the database name from the data source.
+    // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() 
must be used to update the mapping.
+    protected abstract void loadDatabaseNames();
+
+    // Load the table names for the specified database from the data source.
+    // In the corresponding getTableNameList(), setTableNameMapping() must be 
used to update the mapping.
+    protected abstract void loadTableNames(String localDbName);
+
+    // Load the column names for a specified table in a database from the data 
source.
+    // In the corresponding getColumnNameList(), setColumnNameMapping() must 
be used to update the mapping.
+    protected abstract void loadColumnNames(String localDbName, String 
localTableName);
+
+    private JsonNode readAndParseJson(String jsonPath, String nodeName) {
+        JsonNode rootNode;
+        try {
+            rootNode = mapper.readTree(jsonPath);
+            return rootNode.path(nodeName);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("parse meta_names_mapping property 
error", e);
+        }
+    }
+
+    private Map<String, List<String>> nameListToMapping(List<String> 
remoteNames,
+            ConcurrentHashMap<String, String> localNameToRemoteName,
+            Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
+        List<String> filteredDatabaseNames = Lists.newArrayList();
+        Set<String> lowerCaseNames = Sets.newHashSet();
+        Map<String, List<String>> nameMap = Maps.newHashMap();
+        List<String> conflictNames = Lists.newArrayList();
+
+        for (String name : remoteNames) {
+            String mappedName = nameMapping.getOrDefault(name, name);
+            String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() 
: mappedName;
+
+            // Use computeIfAbsent to ensure atomicity
+            localNameToRemoteName.computeIfAbsent(localName, k -> name);
+
+            if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
+                if (nameMap.containsKey(localName)) {
+                    nameMap.get(localName).add(mappedName);
+                }
+            } else {
+                nameMap.putIfAbsent(localName, 
Lists.newArrayList(Collections.singletonList(mappedName)));
+            }
+
+            filteredDatabaseNames.add(localName);
+        }
+
+        for (List<String> conflictNameList : nameMap.values()) {
+            if (conflictNameList.size() > 1) {
+                conflictNames.addAll(conflictNameList);
+            }
+        }
+
+        Map<String, List<String>> result = Maps.newConcurrentMap();
+        result.put("localNames", filteredDatabaseNames);
+        result.put("conflictNames", conflictNames);
+        return result;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
index d2731d33e13..6369f180d32 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcScanNode.java
@@ -81,7 +81,7 @@ public class JdbcScanNode extends ExternalScanNode {
             tbl = (JdbcTable) (desc.getTable());
         }
         jdbcType = tbl.getJdbcTableType();
-        tableName = tbl.getProperRealFullTableName(jdbcType);
+        tableName = tbl.getProperRemoteFullTableName(jdbcType);
     }
 
     @Override
@@ -131,7 +131,7 @@ public class JdbcScanNode extends ExternalScanNode {
         for (SlotRef slotRef : slotRefs) {
             SlotRef slotRef1 = (SlotRef) slotRef.clone();
             slotRef1.setTblName(null);
-            slotRef1.setLabel(JdbcTable.properNameWithRealName(jdbcType, 
slotRef1.getColumnName()));
+            slotRef1.setLabel(JdbcTable.properNameWithRemoteName(jdbcType, 
slotRef1.getColumnName()));
             sMap.put(slotRef, slotRef1);
         }
 
@@ -171,7 +171,7 @@ public class JdbcScanNode extends ExternalScanNode {
                 continue;
             }
             Column col = slot.getColumn();
-            columns.add(tbl.getProperRealColumnName(jdbcType, col.getName()));
+            columns.add(tbl.getProperRemoteColumnName(jdbcType, 
col.getName()));
         }
         if (columns.isEmpty()) {
             columns.add("*");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
index f55821953e2..68a0c04a240 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java
@@ -48,7 +48,7 @@ public class JdbcTableSink extends DataSink {
     public JdbcTableSink(JdbcTable jdbcTable, List<String> insertCols) {
         this.jdbcTable = jdbcTable;
         jdbcType = jdbcTable.getJdbcTableType();
-        externalTableName = jdbcTable.getProperRealFullTableName(jdbcType);
+        externalTableName = jdbcTable.getProperRemoteFullTableName(jdbcType);
         useTransaction = 
ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
         dorisTableName = jdbcTable.getName();
         insertSql = jdbcTable.getInsertSql(insertCols);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
index 8394daf0682..49c0e07732b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
@@ -41,7 +41,24 @@ public class JdbcExternalCatalogTest {
         properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar");
         properties.put(JdbcResource.JDBC_URL, 
"jdbc:oracle:thin:@127.0.0.1:1521:XE");
         properties.put(JdbcResource.DRIVER_CLASS, 
"oracle.jdbc.driver.OracleDriver");
-        jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, 
properties, "testComment");
+        jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null, 
properties, "testComment", false);
+    }
+
+    @Test
+    public void testProcessCompatibleProperties() throws DdlException {
+        // Create a properties map with lower_case_table_names
+        Map<String, String> inputProps = new HashMap<>();
+        inputProps.put("lower_case_table_names", "true");
+
+        // Call processCompatibleProperties
+        Map<String, String> resultProps = 
jdbcExternalCatalog.processCompatibleProperties(inputProps, true);
+
+        // Assert that lower_case_meta_names is present and has the correct 
value
+        Assert.assertTrue(resultProps.containsKey("lower_case_meta_names"));
+        Assert.assertEquals("true", resultProps.get("lower_case_meta_names"));
+
+        // Assert that lower_case_table_names is not present
+        Assert.assertFalse(resultProps.containsKey("lower_case_table_names"));
     }
 
     @Test
@@ -69,13 +86,13 @@ public class JdbcExternalCatalogTest {
                 exception1.getMessage());
 
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
 "true");
-        
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
 "1");
+        
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_META_NAMES,
 "1");
         Exception exception2 = Assert.assertThrows(DdlException.class, () -> 
jdbcExternalCatalog.checkProperties());
-        Assert.assertEquals("errCode = 2, detailMessage = 
lower_case_table_names must be true or false",
+        Assert.assertEquals("errCode = 2, detailMessage = 
lower_case_meta_names must be true or false",
                 exception2.getMessage());
 
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
 "false");
-        
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
 "false");
+        
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.LOWER_CASE_META_NAMES,
 "false");
         
jdbcExternalCatalog.getCatalogProperty().addProperty(JdbcResource.INCLUDE_DATABASE_LIST,
 "db1,db2");
         DdlException exceptione3 = Assert.assertThrows(DdlException.class, () 
-> jdbcExternalCatalog.checkProperties());
         Assert.assertEquals(
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
index 8aa7c81d763..2a636532069 100644
--- a/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_mysql_jdbc_catalog.out
@@ -274,6 +274,9 @@ doris_test
 information_schema
 
 -- !specified_database_3 --
+DORIS
+Doris
+doris
 information_schema
 init_db
 mysql
@@ -413,3 +416,28 @@ year       SMALLINT        Yes     false   \N      NONE
 -- !auto_default_t2 --
 0
 
+-- !sql --
+doris_1
+doris_2
+doris_3
+doris_test
+information_schema
+init_db
+mysql
+performance_schema
+show_test_do_not_modify
+sys
+
+-- !sql --
+doris_1
+doris_2
+doris_3
+
+-- !sql --
+DORIS
+
+-- !sql --
+Doris
+
+-- !sql --
+doris
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_oracle_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_oracle_jdbc_catalog.out
index d835e1c0c48..3c0a07f0d4e 100644
--- a/regression-test/data/external_table_p0/jdbc/test_oracle_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_oracle_jdbc_catalog.out
@@ -258,3 +258,20 @@ information_schema
 -- !query_ad2 --
 1      alice
 
+-- !query_lower_desc --
+doris_1        TEXT    Yes     true    \N      
+doris_2        TEXT    Yes     true    \N      
+doris_3        TEXT    Yes     true    \N      
+
+-- !query_lower_all --
+DORIS  Doris   doris
+
+-- !query_lower_1 --
+DORIS
+
+-- !query_lower_2 --
+Doris
+
+-- !query_lower_3 --
+doris
+
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_catalog.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_catalog.groovy
index 473c418b027..d990c12b36b 100644
--- 
a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_catalog.groovy
@@ -549,6 +549,58 @@ suite("test_mysql_jdbc_catalog", 
"p0,external,mysql,external_docker,external_doc
         }
         order_qt_auto_default_t2 """insert into ${auto_default_t}(name,dt) 
select col1, coalesce(col12,'2022-01-01 00:00:00') from ex_tb15 limit 1;"""
         sql """drop catalog if exists ${catalog_name} """
+
+        // test lower_case_meta_names
+
+        sql """ drop catalog if exists mysql_lower_case_catalog """
+        sql """ CREATE CATALOG mysql_lower_case_catalog PROPERTIES (
+                    "type"="jdbc",
+                    "user"="root",
+                    "password"="123456",
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "lower_case_meta_names" = "true",
+                    "meta_names_mapping" = '{"databases": [{"remoteDatabase": 
"DORIS","mapping": "doris_1"},{"remoteDatabase": "Doris","mapping": 
"doris_2"},{"remoteDatabase": "doris","mapping": "doris_3"}],"tables": 
[{"remoteDatabase": "Doris","remoteTable": "DORIS","mapping": 
"doris_1"},{"remoteDatabase": "Doris","remoteTable": "Doris","mapping": 
"doris_2"},{"remoteDatabase": "Doris","remoteTable": "doris","mapping": 
"doris_3"}]}'
+            );
+        """
+
+        qt_sql "show databases from mysql_lower_case_catalog;"
+        qt_sql "show tables from mysql_lower_case_catalog.doris_2;"
+        qt_sql "select * from mysql_lower_case_catalog.doris_2.doris_1 order 
by id;"
+        qt_sql "select * from mysql_lower_case_catalog.doris_2.doris_2 order 
by id;"
+        qt_sql "select * from mysql_lower_case_catalog.doris_2.doris_3 order 
by id;"
+
+        sql """ drop catalog if exists mysql_lower_case_catalog; """
+        sql """ drop catalog if exists mysql_lower_case_catalog2; """
+        test {
+            sql """ CREATE CATALOG mysql_lower_case_catalog2 PROPERTIES (
+                        "type"="jdbc",
+                        "user"="root",
+                        "password"="123456",
+                        "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
+                        "driver_url" = "${driver_url}",
+                        "driver_class" = "com.mysql.cj.jdbc.Driver",
+                        "lower_case_table_names" = "true",
+                        "meta_names_mapping" = '{"databases": 
[{"remoteDatabase": "DORIS","mapping": "doris_1"},{"remoteDatabase": 
"Doris","mapping": "doris_2"},{"remoteDatabase": "doris","mapping": 
"doris_3"}],"tables": [{"remoteDatabase": "Doris","remoteTable": 
"DORIS","mapping": "doris_1"},{"remoteDatabase": "Doris","remoteTable": 
"Doris","mapping": "doris_2"},{"remoteDatabase": "Doris","remoteTable": 
"doris","mapping": "doris_3"}]}'
+                    );
+                """
+            exception "Jdbc catalog property lower_case_table_names is not 
supported, please use lower_case_meta_names instead"
+            }
+        sql """ drop catalog if exists mysql_lower_case_catalog2; """
+        sql """ drop catalog if exists mysql_lower_case_catalog3; """
+        sql """ CREATE CATALOG mysql_lower_case_catalog3 PROPERTIES (
+                    "type"="jdbc",
+                    "user"="root",
+                    "password"="123456",
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "lower_case_meta_names" = "true",
+                    "meta_names_mapping" = "{\\\"databases\\\": 
[{\\\"remoteDatabase\\\": \\\"DORIS\\\",\\\"mapping\\\": 
\\\"doris_1\\\"},{\\\"remoteDatabase\\\": \\\"Doris\\\",\\\"mapping\\\": 
\\\"doris_2\\\"},{\\\"remoteDatabase\\\": \\\"doris\\\",\\\"mapping\\\": 
\\\"doris_3\\\"}],\\\"tables\\\": [{\\\"remoteDatabase\\\": 
\\\"Doris\\\",\\\"remoteTable\\\": \\\"DORIS\\\",\\\"mapping\\\": 
\\\"doris_1\\\"},{\\\"remoteDatabase\\\": \\\"Doris\\\",\\\"remoteTable\\\": 
\\\"Doris\\\",\\\"mapp [...]
+                    );
+                """
+        sql """ drop catalog if exists mysql_lower_case_catalog3; """
     }
 }
 
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_oracle_jdbc_catalog.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_oracle_jdbc_catalog.groovy
index 59e4a506d81..226563beaaa 100644
--- 
a/regression-test/suites/external_table_p0/jdbc/test_oracle_jdbc_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_oracle_jdbc_catalog.groovy
@@ -215,7 +215,7 @@ suite("test_oracle_jdbc_catalog", 
"p0,external,oracle,external_docker,external_d
         qt_specified_database   """ show databases; """
         sql """drop catalog if exists ${catalog_name} """
 
-        // test lower_case_table_names argument
+        // test lower_case_meta_names argument
         sql """create catalog if not exists ${catalog_name} properties(
                     "type"="jdbc",
                     "user"="doris_test",
@@ -223,7 +223,7 @@ suite("test_oracle_jdbc_catalog", 
"p0,external,oracle,external_docker,external_d
                     "jdbc_url" = 
"jdbc:oracle:thin:@${externalEnvIp}:${oracle_port}:${SID}",
                     "driver_url" = "${driver_url}",
                     "driver_class" = "oracle.jdbc.driver.OracleDriver",
-                    "lower_case_table_names" = "true"
+                    "lower_case_meta_names" = "true"
         );"""
         sql """ switch ${catalog_name} """
         sql """ use ${ex_db_name_lower_case}"""
@@ -248,11 +248,13 @@ suite("test_oracle_jdbc_catalog", 
"p0,external,oracle,external_docker,external_d
                     "jdbc_url" = 
"jdbc:oracle:thin:@${externalEnvIp}:${oracle_port}:${SID}",
                     "driver_url" = "${driver_url}",
                     "driver_class" = "oracle.jdbc.driver.OracleDriver",
-                    "lower_case_table_names" = "true"
+                    "lower_case_meta_names" = "true"
         );"""
         sql """ switch ${catalog_name} """
         qt_query_clob """ select * from doris_test.test_clob order by id; """
 
+        sql """drop catalog if exists ${catalog_name} """
+
         // test for `AA/D`
         sql """create catalog if not exists ${catalog_name} properties(
                     "type"="jdbc",
@@ -261,11 +263,32 @@ suite("test_oracle_jdbc_catalog", 
"p0,external,oracle,external_docker,external_d
                     "jdbc_url" = 
"jdbc:oracle:thin:@${externalEnvIp}:${oracle_port}:${SID}",
                     "driver_url" = "${driver_url}",
                     "driver_class" = "oracle.jdbc.driver.OracleDriver",
-                    "lower_case_table_names" = "true"
+                    "lower_case_meta_names" = "true"
         );"""
         sql """ switch ${catalog_name} """
         qt_query_ad1 """ select * from doris_test.`aa/d` order by id; """
         qt_query_ad2 """ select * from doris_test.aaad order by id; """
 
+        sql """drop catalog if exists ${catalog_name} """
+
+        // test for suffix column name
+        sql """create catalog if not exists ${catalog_name} properties(
+                    "type"="jdbc",
+                    "user"="doris_test",
+                    "password"="123456",
+                    "jdbc_url" = 
"jdbc:oracle:thin:@${externalEnvIp}:${oracle_port}:${SID}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "oracle.jdbc.driver.OracleDriver",
+                    "lower_case_meta_names" = "true",
+                    "meta_names_mapping" = '{"columns": [{"remoteDatabase": 
"DORIS_TEST","remoteTable": "LOWER_TEST","remoteColumn": "DORIS","mapping": 
"doris_1"},{"remoteDatabase": "DORIS_TEST","remoteTable": 
"LOWER_TEST","remoteColumn": "Doris","mapping": "doris_2"},{"remoteDatabase": 
"DORIS_TEST","remoteTable": "LOWER_TEST","remoteColumn": "doris","mapping": 
"doris_3"}]}'
+        );"""
+        sql """ switch ${catalog_name} """
+        qt_query_lower_desc """ desc doris_test.lower_test; """
+        qt_query_lower_all """ select * from doris_test.lower_test; """
+        qt_query_lower_1 """ select doris_1 from doris_test.lower_test; """
+        qt_query_lower_2 """ select doris_2 from doris_test.lower_test; """
+        qt_query_lower_3 """ select doris_3 from doris_test.lower_test; """
+
+        sql """drop catalog if exists ${catalog_name} """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to