This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new fb54f38 KYLIN-3707 add configuration for setting isolation-level for sqoop fb54f38 is described below commit fb54f38cdad26b776b6f5116cacfbc8470e19142 Author: woyumen4597 <woyumen4...@gmail.com> AuthorDate: Fri Dec 14 15:03:34 2018 +0800 KYLIN-3707 add configuration for setting isolation-level for sqoop --- .../sdk/datasource/framework/JdbcConnector.java | 4 +- .../framework/conv/DefaultConfiguer.java | 9 ++-- .../datasource/framework/conv/SqlConverter.java | 6 +++ .../framework/conv/SqlConverterTest.java | 62 +++++++++++++++++----- .../source/jdbc/extensible/JdbcHiveInputBase.java | 12 +++-- 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java index d849e6c..47ba6b3 100644 --- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java +++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java @@ -37,7 +37,6 @@ import org.apache.kylin.sdk.datasource.framework.def.DataSourceDefProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; public class JdbcConnector implements Closeable { @@ -175,8 +174,7 @@ public class JdbcConnector implements Closeable { return jdbcDs.getPropertyValue(key); } - @VisibleForTesting - SqlConverter getSqlConverter() { + public SqlConverter getSqlConverter() { return sqlConverter; } diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java index 6d7fb6d..6c01a70 100644 --- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java +++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java @@ -78,10 +78,6 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{ if (this.adaptor == null) { return orig; } - // fix problem of case sensitive when generate sql. -// if (isCaseSensitive()) { -// orig = adaptor.fixCaseSensitiveSql(orig); -// } return adaptor.fixSql(orig); } @@ -134,4 +130,9 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{ } return adaptor.fixIdentifierCaseSensitve(orig); } + + @Override + public String getTransactionIsolationLevel() { + return dsDef.getPropertyValue("transaction.isolation-level"); + } } diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java index d25c04f..e8302e8 100644 --- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java +++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java @@ -110,5 +110,11 @@ public class SqlConverter { boolean enableQuote(); String fixIdentifierCaseSensitve(String orig); + + /** + * Only support following 3 types + * TRANSACTION_READ_COMMITTED,TRANSACTION_READ_UNCOMMITTED,TRANSACTION_READ_COMMITTED + */ + String getTransactionIsolationLevel(); } } diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java index 94cc651..451be60 100644 --- a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java +++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java @@ -111,12 +111,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { public String fixIdentifierCaseSensitve(String orig) { return orig; } + + @Override + public String getTransactionIsolationLevel() { + return null; + } }, master); // escape default keywords - Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT")); - Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT")); - Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", converter.convertSql("select * from \"default\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", + converter.convertSql("select * from \"DEFAULT\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", + converter.convertSql("select * from \"Default\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", + converter.convertSql("select * from \"default\".FACT")); } @Test @@ -189,6 +197,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { public String fixIdentifierCaseSensitve(String orig) { return orig; } + + @Override + public String getTransactionIsolationLevel() { + return null; + } }, master); // normal cases @@ -203,17 +216,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { // escape default keywords Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from DEFAULT.FACT")); - Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT")); - Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT")); - Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"default\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", + converter.convertSql("select * from \"DEFAULT\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", + converter.convertSql("select * from \"Default\".FACT")); + Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", + converter.convertSql("select * from \"default\".FACT")); // function mapping Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select DAYOFYEAR(PART_DT) from \"DEFAULT\".FACT")); Assert.assertEquals( - "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " + - "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" + - "FROM \"DEFAULT\".\"FACT\"", + "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " + + "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" + + "FROM \"DEFAULT\".\"FACT\"", converter.convertSql("select TIMESTAMPDIFF(month,DT2, DT1) from \"DEFAULT\".FACT")); Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select cast(ID as INT) from \"DEFAULT\".FACT")); @@ -221,12 +237,14 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { converter.convertSql("select 1 from a where 1 BETWEEN 0 and 2")); Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()", converter.convertSql("select CURRENT_DATE, CURRENT_TIME")); - Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"", + Assert.assertEquals( + "SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql( "select exp(avg(ln(dayofyear(cast('2018-03-20' as date))))) from \"DEFAULT\".FACT")); // over function - Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY", + Assert.assertEquals( + "SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY", converter.convertSql("select stddev_pop(c1) over(order by c1) from test_suite limit 1")); // type mapping @@ -332,6 +350,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { public String fixIdentifierCaseSensitve(String orig) { return orig; } + + @Override + public String getTransactionIsolationLevel() { + return null; + } }, master); Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY", @@ -347,6 +370,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY", converter.convertSql("SELECT 1 LIMIT 1")); } + @Test public void testConvertQuotedSqlWithEscape() throws SQLException { DataSourceDefProvider provider = DataSourceDefProvider.getInstance(); @@ -417,6 +441,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { public String fixIdentifierCaseSensitve(String orig) { return orig; } + + @Override + public String getTransactionIsolationLevel() { + return null; + } }, master); Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM \"DEFAULT\".\"CUBE\"", @@ -425,8 +454,10 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as `DDD`) from DEFAULT.`CUBE`")); Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"", converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" as \"DDD\") from \"DEFAULT\".\"CUBE\"")); - Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50", - converter.convertSql("select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50")); + Assert.assertEquals( + "SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50", + converter.convertSql( + "select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50")); Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM \"cube\".\"kylin_sales\"", converter.convertSql("select count(distinct `price_#@`) from `cube`.`kylin_sales`")); @@ -502,6 +533,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase { public String fixIdentifierCaseSensitve(String orig) { return orig.toUpperCase(Locale.ROOT); } + + @Override + public String getTransactionIsolationLevel() { + return null; + } }, master); Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.`aa`", "`")); diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java index 10eb31e..ec69084 100644 --- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java +++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java @@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.sdk.datasource.framework.JdbcConnector; +import org.apache.kylin.sdk.datasource.framework.conv.SqlConverter; import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,15 +83,15 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu String filedDelimiter = config.getJdbcSourceFieldDelimiter(); int mapperNum = config.getSqoopMapperNum(); - String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn, - splitDatabase, splitTable, splitTableAlias); + String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, + splitColumn, splitDatabase, splitTable, splitTableAlias); bquery = dataSource.convertSql(bquery); if (partitionDesc.isPartitioned()) { SegmentRange segRange = flatDesc.getSegRange(); if (segRange != null && !segRange.isInfinite()) { if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias) && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc - .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) { + .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) { String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc, partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange), @@ -110,6 +111,11 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(), dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery, filedDelimiter, mapperNum); + SqlConverter.IConfigurer configurer = dataSource.getSqlConverter().getConfigurer(); + if (configurer.getTransactionIsolationLevel() != null) { + cmd = cmd + " --relaxed-isolation --metadata-transaction-isolation-level " + + configurer.getTransactionIsolationLevel(); + } logger.debug("sqoop cmd: {}", cmd); SqoopCmdStep step = new SqoopCmdStep();