This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 557159d3ce [feature](JdbcExternalCatalog) support insert data in JdbcExternalCatalog (#16271) 557159d3ce is described below commit 557159d3ceff022903839e45ab07d94c922d244d Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Thu Feb 2 17:31:33 2023 +0800 [feature](JdbcExternalCatalog) support insert data in JdbcExternalCatalog (#16271) --- be/src/exec/table_connector.cpp | 9 +++-- .../docker-compose/mysql/init/03-create-table.sql | 5 +++ .../docker-compose/oracle/init/03-create-table.sql | 6 ++++ .../postgresql/init/02-create-table.sql | 6 ++++ .../java/org/apache/doris/analysis/InsertStmt.java | 39 ++++++++++++++++------ .../doris/transaction/DatabaseTransactionMgr.java | 3 +- .../doris/transaction/GlobalTransactionMgr.java | 5 +-- .../jdbc_catalog_p0/test_mysql_jdbc_catalog.out | 13 ++++++++ .../jdbc_catalog_p0/test_oracle_jdbc_catalog.out | 13 ++++++++ .../data/jdbc_catalog_p0/test_pg_jdbc_catalog.out | 13 ++++++++ .../jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy | 23 ++++++++++--- .../test_oracle_jdbc_catalog.groovy | 14 +++++++- .../jdbc_catalog_p0/test_pg_jdbc_catalog.groovy | 13 ++++++++ 13 files changed, 140 insertions(+), 22 deletions(-) diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp index 12dc3acdc2..30b01b1d03 100644 --- a/be/src/exec/table_connector.cpp +++ b/be/src/exec/table_connector.cpp @@ -226,8 +226,13 @@ Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_p case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - // here need check the ' is used, now for pg array string must be " - fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size)); + // TODO(zhangstar333): check array data type of postgresql + // for oracle/pg database string must be ' + if (table_type == TOdbcTableType::ORACLE || table_type == TOdbcTableType::POSTGRESQL) { + fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size)); + } else { + fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size)); + } break; } case TYPE_ARRAY: { diff --git a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql index 02c257cbc8..8fb1aebc4b 100644 --- a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql +++ b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql @@ -223,4 +223,9 @@ create table doris_test.ex_tb20 ( decimal_unsigned_long decimal(65, 5) unsigned ) engine=innodb charset=utf8; +create table doris_test.test_insert ( + `id` varchar(128) NULL, + `name` varchar(128) NULL, + `age` int NULL +) engine=innodb charset=utf8; diff --git a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql index d5dd8cf1c6..d2d8d6af7e 100644 --- a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql +++ b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql @@ -78,3 +78,9 @@ t4 timestamp, t5 interval year(3) to month, t6 interval day(3) to second(6) ); + +create table doris_test.test_insert( +id varchar2(128), +name varchar2(128), +age number(5) +); diff --git a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql index b721da297a..6ace3b20cb 100644 --- a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql +++ b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql @@ -150,3 +150,9 @@ CREATE TABLE catalog_pg_test.test12 ( ID INT NOT NULL, uuid_value uuid ); + +CREATE TABLE catalog_pg_test.test_insert ( + id varchar(128), + name varchar(128), + age int +); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 44140b24e9..891fe3349b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -31,6 +31,8 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.JdbcExternalDatabase; +import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -39,6 +41,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.JdbcExternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; @@ -110,7 +114,7 @@ public class InsertStmt extends DdlStmt { private Table targetTable; - private Database db; + private DatabaseIf db; private long transactionId; // we need a new TupleDesc for olap table. @@ -191,12 +195,15 @@ public class InsertStmt extends DdlStmt { // get dbs of statement queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet); tblName.analyze(analyzer); - // disallow external catalog - Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + // disallow external catalog except JdbcExternalCatalog + if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog + && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { + Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + } String dbName = tblName.getDb(); String tableName = tblName.getTbl(); // check exist - DatabaseIf db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName); + DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName); TableIf table = db.getTableOrAnalysisException(tblName.getTbl()); // check access @@ -247,7 +254,7 @@ public class InsertStmt extends DdlStmt { return dataSink; } - public Database getDbObj() { + public DatabaseIf getDbObj() { return db; } @@ -261,8 +268,11 @@ public class InsertStmt extends DdlStmt { if (targetTable == null) { tblName.analyze(analyzer); - // disallow external catalog - Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + // disallow external catalog except JdbcExternalCatalog + if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog + && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { + Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + } } // Check privilege @@ -292,8 +302,7 @@ public class InsertStmt extends DdlStmt { // create data sink createDataSink(); - db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tblName.getDb()); - + db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); // create label and begin transaction long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); if (Strings.isNullOrEmpty(label)) { @@ -322,8 +331,16 @@ public class InsertStmt extends DdlStmt { private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { // Get table if (targetTable == null) { - DatabaseIf db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(tblName.getDb()); - targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl()); + DatabaseIf db = analyzer.getEnv().getCatalogMgr() + .getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); + if (db instanceof Database) { + targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl()); + } else if (db instanceof JdbcExternalDatabase) { + JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl()); + targetTable = jdbcTable.getJdbcTable(); + } else { + throw new AnalysisException("Not support insert target table."); + } } if (targetTable instanceof OlapTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 347bb33205..dec2a4298f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -18,6 +18,7 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; @@ -664,7 +665,7 @@ public class DatabaseTransactionMgr { LOG.info("transaction:[{}] successfully committed", transactionState); } - public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis) + public boolean waitForTransactionFinished(DatabaseIf db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException { TransactionState transactionState = null; readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index f12e219771..3c84056789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -18,6 +18,7 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; @@ -243,13 +244,13 @@ public class GlobalTransactionMgr implements Writable { dbTransactionMgr.commitTransaction(null, transactionId, null, null, true); } - public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId, + public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis) throws UserException { return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null); } - public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId, + public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) throws UserException { diff --git a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out index 1d7cfa7a41..22a7349c64 100644 --- a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out @@ -164,6 +164,19 @@ bca 2022-11-02 2022-11-02 8012 vivo 1.12345 1.12345 1.12345 1.12345 1.12345 1.12345 123456789012345678901234567890123.12345 12345678901234567890123456789012.12345 1234567890123456789012345678901234.12345 123456789012345678901234567890123.12345 123456789012345678901234567890123456789012345678901234567890.12345 123456789012345678901234567890123456789012345678901234567890.12345 +-- !test_insert1 -- +doris1 18 + +-- !test_insert2 -- +doris2 19 +doris3 20 + +-- !test_insert3 -- +doris2 19 +doris2 19 +doris3 20 +doris3 20 + -- !ex_tb1 -- {"k1":"v1", "k2":"v2"} diff --git a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out index 9f45b5584c..cc5c0f50f2 100644 --- a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out @@ -41,3 +41,16 @@ 7 \N \N \N \N 223-9 \N 8 \N \N \N \N \N 12 10:23:1.123457 +-- !test_insert1 -- +doris1 18 + +-- !test_insert2 -- +doris2 19 +doris3 20 + +-- !test_insert3 -- +doris2 19 +doris2 19 +doris3 20 +doris3 20 + diff --git a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out index 3e1c7f8058..ac0ff72029 100644 --- a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out +++ b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out @@ -2136,6 +2136,19 @@ true abc def 2022-10-11 1.234 1 2 99 2022-10-22T10:59:59 34.123 1 980dd890-f7fe-4fff-999d-873516108b2e 2 980dd890-f7fe-4fff-999d-873516108b2e +-- !test_insert1 -- +doris1 18 + +-- !test_insert2 -- +doris2 19 +doris3 20 + +-- !test_insert3 -- +doris2 19 +doris2 19 +doris3 20 +doris3 20 + -- !test_old -- 123 abc 123 abc diff --git a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy index 656fe06848..7132b638c1 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy @@ -45,6 +45,7 @@ suite("test_mysql_jdbc_catalog", "p0") { String ex_tb18 = "ex_tb18"; String ex_tb19 = "ex_tb19"; String ex_tb20 = "ex_tb20"; + String test_insert = "test_insert"; sql """ADMIN SET FRONTEND CONFIG ("enable_decimal_conversion" = "true");""" sql """drop catalog if exists ${catalog_name} """ @@ -101,8 +102,20 @@ suite("test_mysql_jdbc_catalog", "p0") { order_qt_ex_tb19 """ select * from ${ex_tb19} order by date_value; """ order_qt_ex_tb20 """ select * from ${ex_tb20} order by decimal_normal; """ - sql """drop catalog if exists ${catalog_name} """ - sql """drop resource if exists ${resource_name}""" + // test insert + String uuid1 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) """ + order_qt_test_insert1 """ select name, age from ${test_insert} where id = '${uuid1}' order by age """ + + String uuid2 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), ('${uuid2}', 'doris3', 20) """ + order_qt_test_insert2 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ + + sql """ insert into ${test_insert} select * from ${test_insert} where id = '${uuid2}' """ + order_qt_test_insert3 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ + + sql """ drop catalog if exists ${catalog_name} """ + sql """ drop resource if exists ${resource_name} """ // test old create-catalog syntax for compatibility sql """ CREATE CATALOG ${catalog_name} PROPERTIES ( @@ -113,9 +126,9 @@ suite("test_mysql_jdbc_catalog", "p0") { "jdbc.driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar", "jdbc.driver_class" = "com.mysql.cj.jdbc.Driver"); """ - sql """switch ${catalog_name}""" - sql """use ${ex_db_name}""" + sql """ switch ${catalog_name} """ + sql """ use ${ex_db_name} """ order_qt_ex_tb1 """ select * from ${ex_tb1} order by id; """ - sql """drop resource if exists ${resource_name}""" + sql """ drop catalog if exists ${catalog_name} """ } } diff --git a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy index 51a7b174f7..7ddcce6c10 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy @@ -23,7 +23,8 @@ suite("test_oracle_jdbc_catalog", "p0") { String internal_db_name = "regression_test_jdbc_catalog_p0"; String ex_db_name = "DORIS_TEST"; String oracle_port = context.config.otherConfigs.get("oracle_11_port"); - String SID = "XE" + String SID = "XE"; + String test_insert = "TEST_INSERT"; String inDorisTable = "doris_in_tb"; @@ -68,6 +69,17 @@ suite("test_oracle_jdbc_catalog", "p0") { // So instead of qt, we're using sql here. sql """ select * from TEST_RAW order by ID; """ + // test insert + String uuid1 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) """ + order_qt_test_insert1 """ select name, age from ${test_insert} where id = '${uuid1}' order by age """ + + String uuid2 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), ('${uuid2}', 'doris3', 20) """ + order_qt_test_insert2 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ + + sql """ insert into ${test_insert} select * from ${test_insert} where id = '${uuid2}' """ + order_qt_test_insert3 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ sql """drop catalog if exists ${catalog_name} """ sql """drop resource if exists jdbc_resource_catalog_pg""" diff --git a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy index 71462cc307..883d15be9e 100644 --- a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy +++ b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy @@ -25,6 +25,7 @@ suite("test_pg_jdbc_catalog", "p0") { String ex_schema_name2 = "catalog_pg_test"; String pg_port = context.config.otherConfigs.get("pg_14_port"); String inDorisTable = "doris_in_tb"; + String test_insert = "test_insert"; sql """drop catalog if exists ${catalog_name} """ sql """drop resource if exists ${resource_name}""" @@ -73,6 +74,18 @@ suite("test_pg_jdbc_catalog", "p0") { order_qt_test13 """ select * from test11 order by id; """ order_qt_test14 """ select * from test12 order by id; """ + // test insert + String uuid1 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18) """ + order_qt_test_insert1 """ select name, age from ${test_insert} where id = '${uuid1}' order by age """ + + String uuid2 = UUID.randomUUID().toString(); + sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19), ('${uuid2}', 'doris3', 20) """ + order_qt_test_insert2 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ + + sql """ insert into ${test_insert} select * from ${test_insert} where id = '${uuid2}' """ + order_qt_test_insert3 """ select name, age from ${test_insert} where id = '${uuid2}' order by age """ + sql """drop catalog if exists ${catalog_name} """ sql """drop resource if exists ${resource_name}""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org