This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 557159d3ce [feature](JdbcExternalCatalog) support insert data in 
JdbcExternalCatalog (#16271)
557159d3ce is described below

commit 557159d3ceff022903839e45ab07d94c922d244d
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Thu Feb 2 17:31:33 2023 +0800

    [feature](JdbcExternalCatalog) support insert data in JdbcExternalCatalog 
(#16271)
---
 be/src/exec/table_connector.cpp                    |  9 +++--
 .../docker-compose/mysql/init/03-create-table.sql  |  5 +++
 .../docker-compose/oracle/init/03-create-table.sql |  6 ++++
 .../postgresql/init/02-create-table.sql            |  6 ++++
 .../java/org/apache/doris/analysis/InsertStmt.java | 39 ++++++++++++++++------
 .../doris/transaction/DatabaseTransactionMgr.java  |  3 +-
 .../doris/transaction/GlobalTransactionMgr.java    |  5 +--
 .../jdbc_catalog_p0/test_mysql_jdbc_catalog.out    | 13 ++++++++
 .../jdbc_catalog_p0/test_oracle_jdbc_catalog.out   | 13 ++++++++
 .../data/jdbc_catalog_p0/test_pg_jdbc_catalog.out  | 13 ++++++++
 .../jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy | 23 ++++++++++---
 .../test_oracle_jdbc_catalog.groovy                | 14 +++++++-
 .../jdbc_catalog_p0/test_pg_jdbc_catalog.groovy    | 13 ++++++++
 13 files changed, 140 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 12dc3acdc2..30b01b1d03 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -226,8 +226,13 @@ Status TableConnector::convert_column_data(const 
vectorized::ColumnPtr& column_p
     case TYPE_VARCHAR:
     case TYPE_CHAR:
     case TYPE_STRING: {
-        // here need check the ' is used, now for pg array string must be "
-        fmt::format_to(_insert_stmt_buffer, "\"{}\"", 
fmt::basic_string_view(item, size));
+        // TODO(zhangstar333): check array data type of postgresql
+        // for oracle/pg database string must be '
+        if (table_type == TOdbcTableType::ORACLE || table_type == 
TOdbcTableType::POSTGRESQL) {
+            fmt::format_to(_insert_stmt_buffer, "'{}'", 
fmt::basic_string_view(item, size));
+        } else {
+            fmt::format_to(_insert_stmt_buffer, "\"{}\"", 
fmt::basic_string_view(item, size));
+        }
         break;
     }
     case TYPE_ARRAY: {
diff --git a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql 
b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
index 02c257cbc8..8fb1aebc4b 100644
--- a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
@@ -223,4 +223,9 @@ create table doris_test.ex_tb20 (
     decimal_unsigned_long decimal(65, 5) unsigned
 ) engine=innodb charset=utf8;
 
+create table doris_test.test_insert (
+    `id` varchar(128) NULL,
+    `name` varchar(128) NULL,
+    `age` int NULL
+) engine=innodb charset=utf8;
 
diff --git a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql 
b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
index d5dd8cf1c6..d2d8d6af7e 100644
--- a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
@@ -78,3 +78,9 @@ t4 timestamp,
 t5 interval year(3) to month,
 t6 interval day(3) to second(6)
 );
+
+create table doris_test.test_insert(
+id varchar2(128),
+name varchar2(128),
+age number(5)
+);
diff --git 
a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql 
b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
index b721da297a..6ace3b20cb 100644
--- a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
+++ b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
@@ -150,3 +150,9 @@ CREATE TABLE catalog_pg_test.test12 (
    ID INT NOT NULL,
    uuid_value uuid
 );
+
+CREATE TABLE catalog_pg_test.test_insert (
+   id varchar(128),
+   name varchar(128),
+   age int
+);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 44140b24e9..891fe3349b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -31,6 +31,8 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.JdbcExternalDatabase;
+import org.apache.doris.catalog.external.JdbcExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -39,6 +41,8 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.JdbcExternalCatalog;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
@@ -110,7 +114,7 @@ public class InsertStmt extends DdlStmt {
 
     private Table targetTable;
 
-    private Database db;
+    private DatabaseIf db;
     private long transactionId;
 
     // we need a new TupleDesc for olap table.
@@ -191,12 +195,15 @@ public class InsertStmt extends DdlStmt {
         // get dbs of statement
         queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
         tblName.analyze(analyzer);
-        // disallow external catalog
-        Util.prohibitExternalCatalog(tblName.getCtl(), 
this.getClass().getSimpleName());
+        // disallow external catalog except JdbcExternalCatalog
+        if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
+                && !(analyzer.getEnv().getCurrentCatalog() instanceof 
JdbcExternalCatalog)) {
+            Util.prohibitExternalCatalog(tblName.getCtl(), 
this.getClass().getSimpleName());
+        }
         String dbName = tblName.getDb();
         String tableName = tblName.getTbl();
         // check exist
-        DatabaseIf db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
+        DatabaseIf db = 
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName);
         TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
 
         // check access
@@ -247,7 +254,7 @@ public class InsertStmt extends DdlStmt {
         return dataSink;
     }
 
-    public Database getDbObj() {
+    public DatabaseIf getDbObj() {
         return db;
     }
 
@@ -261,8 +268,11 @@ public class InsertStmt extends DdlStmt {
 
         if (targetTable == null) {
             tblName.analyze(analyzer);
-            // disallow external catalog
-            Util.prohibitExternalCatalog(tblName.getCtl(), 
this.getClass().getSimpleName());
+            // disallow external catalog except JdbcExternalCatalog
+            if (analyzer.getEnv().getCurrentCatalog() instanceof 
ExternalCatalog
+                    && !(analyzer.getEnv().getCurrentCatalog() instanceof 
JdbcExternalCatalog)) {
+                Util.prohibitExternalCatalog(tblName.getCtl(), 
this.getClass().getSimpleName());
+            }
         }
 
         // Check privilege
@@ -292,8 +302,7 @@ public class InsertStmt extends DdlStmt {
         // create data sink
         createDataSink();
 
-        db = 
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
-
+        db = 
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
         // create label and begin transaction
         long timeoutSecond = 
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
         if (Strings.isNullOrEmpty(label)) {
@@ -322,8 +331,16 @@ public class InsertStmt extends DdlStmt {
     private void analyzeTargetTable(Analyzer analyzer) throws 
AnalysisException {
         // Get table
         if (targetTable == null) {
-            DatabaseIf db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(tblName.getDb());
-            targetTable = (Table) 
db.getTableOrAnalysisException(tblName.getTbl());
+            DatabaseIf db = analyzer.getEnv().getCatalogMgr()
+                            
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
+            if (db instanceof Database) {
+                targetTable = (Table) 
db.getTableOrAnalysisException(tblName.getTbl());
+            } else if (db instanceof JdbcExternalDatabase) {
+                JdbcExternalTable jdbcTable = (JdbcExternalTable) 
db.getTableOrAnalysisException(tblName.getTbl());
+                targetTable = jdbcTable.getJdbcTable();
+            } else {
+                throw new AnalysisException("Not support insert target 
table.");
+            }
         }
 
         if (targetTable instanceof OlapTable) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 347bb33205..dec2a4298f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -18,6 +18,7 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
@@ -664,7 +665,7 @@ public class DatabaseTransactionMgr {
         LOG.info("transaction:[{}] successfully committed", transactionState);
     }
 
-    public boolean waitForTransactionFinished(Database db, long transactionId, 
long timeoutMillis)
+    public boolean waitForTransactionFinished(DatabaseIf db, long 
transactionId, long timeoutMillis)
             throws TransactionCommitFailedException {
         TransactionState transactionState = null;
         readLock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index f12e219771..3c84056789 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -18,6 +18,7 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
@@ -243,13 +244,13 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.commitTransaction(null, transactionId, null, null, 
true);
     }
 
-    public boolean commitAndPublishTransaction(Database db, List<Table> 
tableList, long transactionId,
+    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
             throws UserException {
         return commitAndPublishTransaction(db, tableList, transactionId, 
tabletCommitInfos, timeoutMillis, null);
     }
 
-    public boolean commitAndPublishTransaction(Database db, List<Table> 
tableList, long transactionId,
+    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
             List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
             TxnCommitAttachment txnCommitAttachment)
             throws UserException {
diff --git a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out 
b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
index 1d7cfa7a41..22a7349c64 100644
--- a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
@@ -164,6 +164,19 @@ bca        2022-11-02      2022-11-02      8012    vivo
 1.12345        1.12345 1.12345 1.12345 1.12345 1.12345
 123456789012345678901234567890123.12345        
12345678901234567890123456789012.12345  
1234567890123456789012345678901234.12345        
123456789012345678901234567890123.12345 
123456789012345678901234567890123456789012345678901234567890.12345      
123456789012345678901234567890123456789012345678901234567890.12345
 
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
 -- !ex_tb1 --
 {"k1":"v1", "k2":"v2"}
 
diff --git a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out 
b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
index 9f45b5584c..cc5c0f50f2 100644
--- a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
@@ -41,3 +41,16 @@
 7      \N      \N      \N      \N      223-9   \N
 8      \N      \N      \N      \N      \N      12 10:23:1.123457
 
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
diff --git a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out 
b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
index 3e1c7f8058..ac0ff72029 100644
--- a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
@@ -2136,6 +2136,19 @@ true     abc     def     2022-10-11      1.234   1       
2       99      2022-10-22T10:59:59     34.123
 1      980dd890-f7fe-4fff-999d-873516108b2e
 2      980dd890-f7fe-4fff-999d-873516108b2e
 
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
 -- !test_old --
 123    abc
 123    abc
diff --git 
a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy 
b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
index 656fe06848..7132b638c1 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
@@ -45,6 +45,7 @@ suite("test_mysql_jdbc_catalog", "p0") {
         String ex_tb18 = "ex_tb18";
         String ex_tb19 = "ex_tb19";
         String ex_tb20 = "ex_tb20";
+        String test_insert = "test_insert";
 
         sql """ADMIN SET FRONTEND CONFIG ("enable_decimal_conversion" = 
"true");"""
         sql """drop catalog if exists ${catalog_name} """
@@ -101,8 +102,20 @@ suite("test_mysql_jdbc_catalog", "p0") {
         order_qt_ex_tb19  """ select * from ${ex_tb19} order by date_value; """
         order_qt_ex_tb20  """ select * from ${ex_tb20} order by 
decimal_normal; """
 
-        sql """drop catalog if exists ${catalog_name} """
-        sql """drop resource if exists ${resource_name}"""
+        // test insert
+        String uuid1 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) 
"""
+        order_qt_test_insert1 """ select name, age from ${test_insert} where 
id = '${uuid1}' order by age """
+
+        String uuid2 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), 
('${uuid2}', 'doris3', 20) """
+        order_qt_test_insert2 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
+
+        sql """ insert into ${test_insert} select * from ${test_insert} where 
id = '${uuid2}' """
+        order_qt_test_insert3 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
+
+        sql """ drop catalog if exists ${catalog_name} """
+        sql """ drop resource if exists ${resource_name} """
 
         // test old create-catalog syntax for compatibility
         sql """ CREATE CATALOG ${catalog_name} PROPERTIES (
@@ -113,9 +126,9 @@ suite("test_mysql_jdbc_catalog", "p0") {
             "jdbc.driver_url" = 
"https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar";,
             "jdbc.driver_class" = "com.mysql.cj.jdbc.Driver");
         """
-        sql """switch ${catalog_name}"""
-        sql """use ${ex_db_name}"""
+        sql """ switch ${catalog_name} """
+        sql """ use ${ex_db_name} """
         order_qt_ex_tb1  """ select * from ${ex_tb1} order by id; """
-        sql """drop resource if exists ${resource_name}"""
+        sql """ drop catalog if exists ${catalog_name} """
     }
 }
diff --git 
a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy 
b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
index 51a7b174f7..7ddcce6c10 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
@@ -23,7 +23,8 @@ suite("test_oracle_jdbc_catalog", "p0") {
         String internal_db_name = "regression_test_jdbc_catalog_p0";
         String ex_db_name = "DORIS_TEST";
         String oracle_port = context.config.otherConfigs.get("oracle_11_port");
-        String SID = "XE"
+        String SID = "XE";
+        String test_insert = "TEST_INSERT";
 
         String inDorisTable = "doris_in_tb";
 
@@ -68,6 +69,17 @@ suite("test_oracle_jdbc_catalog", "p0") {
         // So instead of qt, we're using sql here.
         sql  """ select * from TEST_RAW order by ID; """
 
+        // test insert
+        String uuid1 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) 
"""
+        order_qt_test_insert1 """ select name, age from ${test_insert} where 
id = '${uuid1}' order by age """
+
+        String uuid2 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), 
('${uuid2}', 'doris3', 20) """
+        order_qt_test_insert2 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
+
+        sql """ insert into ${test_insert} select * from ${test_insert} where 
id = '${uuid2}' """
+        order_qt_test_insert3 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
 
         sql """drop catalog if exists ${catalog_name} """
         sql """drop resource if exists jdbc_resource_catalog_pg"""
diff --git a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy 
b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
index 71462cc307..883d15be9e 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
@@ -25,6 +25,7 @@ suite("test_pg_jdbc_catalog", "p0") {
         String ex_schema_name2 = "catalog_pg_test";
         String pg_port = context.config.otherConfigs.get("pg_14_port");
         String inDorisTable = "doris_in_tb";
+        String test_insert = "test_insert";
 
         sql """drop catalog if exists ${catalog_name} """
         sql """drop resource if exists ${resource_name}"""
@@ -73,6 +74,18 @@ suite("test_pg_jdbc_catalog", "p0") {
         order_qt_test13  """ select * from test11 order by id; """
         order_qt_test14  """ select * from test12 order by id; """
 
+        // test insert
+        String uuid1 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) 
"""
+        order_qt_test_insert1 """ select name, age from ${test_insert} where 
id = '${uuid1}' order by age """
+
+        String uuid2 = UUID.randomUUID().toString();
+        sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), 
('${uuid2}', 'doris3', 20) """
+        order_qt_test_insert2 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
+
+        sql """ insert into ${test_insert} select * from ${test_insert} where 
id = '${uuid2}' """
+        order_qt_test_insert3 """ select name, age from ${test_insert} where 
id = '${uuid2}' order by age """
+
         sql """drop catalog if exists ${catalog_name} """
         sql """drop resource if exists ${resource_name}"""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to