This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4bb90cbab34f4168aa6be1050388d10413dd0816 Author: Hang Jia <754332...@qq.com> AuthorDate: Sat Nov 19 11:08:56 2022 +0800 KYLIN-5394 Project-level push-down query does not take effect (jdbc push-down query) * KYLIN-5394 Project-level push-down query does not take effect (jdbc push-down query) * fix UT in gcp: add await time in checkSnapshotSourceTableStatsJsonFile Co-authored-by: jlf <longfei.ji...@kyligence.io> --- .../kylin/source/adhocquery/IPushDownRunner.java | 7 +++ .../sdk/datasource/PushDownRunnerSDKImpl.java | 7 ++- .../sdk/datasource/PushdownRunnerSDKImplTest.java | 6 +-- .../org/apache/kylin/query/util/PushDownUtil.java | 6 +-- .../SnapshotSourceTableStatsServiceTest.scala | 7 ++- .../kylin/query/pushdown/DataSourceConfig.java | 47 +++++++++++++++++ .../pushdown/JdbcPushDownConnectionManager.java | 26 ++++++--- .../query/pushdown/PushDownRunnerJdbcImpl.java | 11 ++-- .../query/pushdown/PushDownRunnerSparkImpl.java | 7 ++- .../query/pushdown/PushDownRunnerJdbcImplTest.java | 61 ++++++++++++++++++---- .../pushdown/PushDownRunnerSparkImplTest.java | 12 ++--- 11 files changed, 162 insertions(+), 35 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java b/src/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java index 558cca08d1..d442600a19 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java @@ -27,6 +27,13 @@ import com.google.common.collect.Lists; public interface IPushDownRunner { + void init(KylinConfig config, String project); + + /** + * @deprecated for compatibility with old plugins + * @param config + */ + @Deprecated void init(KylinConfig config); /** diff --git a/src/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/PushDownRunnerSDKImpl.java b/src/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/PushDownRunnerSDKImpl.java index 9426ccbe5e..5660e92ec9 100644 --- a/src/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/PushDownRunnerSDKImpl.java +++ b/src/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/PushDownRunnerSDKImpl.java @@ -29,11 +29,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.DBUtils; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.sdk.datasource.framework.JdbcConnector; import org.apache.kylin.sdk.datasource.framework.SourceConnectorFactory; import org.apache.kylin.source.adhocquery.IPushDownRunner; -import org.apache.kylin.metadata.project.NProjectManager; import lombok.extern.slf4j.Slf4j; @@ -46,6 +46,11 @@ public class PushDownRunnerSDKImpl implements IPushDownRunner { dataSource = SourceConnectorFactory.getJdbcConnector(config); } + @Override + public void init(KylinConfig config, String project) { + init(config); + } + @Override public void executeQuery(String query, List<List<String>> returnRows, List<SelectedColumnMeta> returnColumnMeta, String project) throws Exception { diff --git a/src/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/PushdownRunnerSDKImplTest.java b/src/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/PushdownRunnerSDKImplTest.java index a275b07491..df9a5a2599 100644 --- a/src/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/PushdownRunnerSDKImplTest.java +++ b/src/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/PushdownRunnerSDKImplTest.java @@ -20,10 +20,10 @@ package org.apache.kylin.sdk.datasource; import java.util.List; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.sdk.datasource.framework.JdbcConnectorTest; -import org.apache.kylin.metadata.project.NProjectManager; import org.junit.Assert; import org.junit.Test; @@ -38,7 +38,7 @@ public class PushdownRunnerSDKImplTest extends JdbcConnectorTest { npr.updateProject(projectInstance); PushDownRunnerSDKImpl pushDownRunnerSDK = new PushDownRunnerSDKImpl(); - pushDownRunnerSDK.init(getTestConfig()); + pushDownRunnerSDK.init(getTestConfig(), projectInstance.getName()); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); String sql = "select count(*) from LINEORDER"; @@ -49,7 +49,7 @@ public class PushdownRunnerSDKImplTest extends JdbcConnectorTest { @Test public void testExecuteUpdate() throws Exception { PushDownRunnerSDKImpl pushDownRunnerSDK = new PushDownRunnerSDKImpl(); - pushDownRunnerSDK.init(getTestConfig()); + pushDownRunnerSDK.init(getTestConfig(), null); String sql = "update SSB.LINEORDER set LO_TAX=1 where LO_ORDERKEY = 1"; pushDownRunnerSDK.executeUpdate(sql, null); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index 9fad04509b..4707d9406a 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -107,7 +107,7 @@ public class PushDownUtil { } IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName()); - runner.init(kylinConfig); + runner.init(kylinConfig, project); logger.debug("Query Pushdown runner {}", runner); // set pushdown engine in query context @@ -213,7 +213,7 @@ public class PushDownUtil { // pushdown IPushDownRunner runner = (IPushDownRunner) ClassUtil .newInstance(kylinConfig.getPartitionCheckRunnerClassNameWithDefaultValue()); - runner.init(kylinConfig); + runner.init(kylinConfig, project); runner.executeQuery(sql, returnRows, returnColumnMeta, project); return Pair.newPair(returnRows, returnColumnMeta); @@ -222,7 +222,7 @@ public class PushDownUtil { public static void trySimplePushDownExecute(String sql, String project) throws Exception { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName()); - runner.init(kylinConfig); + runner.init(kylinConfig, project); runner.executeUpdate(sql, project); } diff --git a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala index 6663a71041..5e0fdb3dc3 100644 --- a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala +++ b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala @@ -179,7 +179,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local + Constants.SOURCE_TABLE_STATS + "/" + tableIdentity) try { val out = fs.create(statsFile, true) - try out.write(new String().getBytes()) + try out.write(new String("{}").getBytes()) catch { case e: IOException => log.error(s"overwrite stats file [$statsFile] failed!", e) @@ -482,6 +482,8 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT) writeEmptyJsonFile(tableIdentity) writeMarkFile() + await.pollDelay(Duration.ONE_SECOND).until(() => true) + var checkStatsFile = snapshotSourceTableStatsService.checkSnapshotSourceTableStatsJsonFile(DEFAULT_PROJECT, tableIdentity) assertFalse(checkStatsFile) @@ -507,6 +509,8 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT) writeEmptyJsonFile(tableIdentity) writeMarkFile() + await.pollDelay(Duration.ONE_SECOND).until(() => true) + var checkStatsFile = snapshotSourceTableStatsService.checkSnapshotSourceTableStatsJsonFile(DEFAULT_PROJECT, tableIdentity) assertFalse(checkStatsFile) @@ -516,6 +520,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local checkStatsFile = snapshotSourceTableStatsService.checkSnapshotSourceTableStatsJsonFile(DEFAULT_PROJECT, tableIdentity) assertTrue(checkStatsFile) + await.pollDelay(Duration.ONE_SECOND).until(() => true) writeMarkFile() checkStatsFile = snapshotSourceTableStatsService.checkSnapshotSourceTableStatsJsonFile(DEFAULT_PROJECT, tableIdentity) assertFalse(checkStatsFile) diff --git a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/DataSourceConfig.java b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/DataSourceConfig.java new file mode 100644 index 0000000000..dd0955f616 --- /dev/null +++ b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/DataSourceConfig.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.pushdown; + +import org.apache.kylin.common.KylinConfig; + +import lombok.EqualsAndHashCode; + +/** + * @author hang.jia + */ +@EqualsAndHashCode +public class DataSourceConfig { + String driverClassName; + String url; + String username; + String password; + int maxActive; + int maxIdle; + int minIdle; + + public DataSourceConfig(KylinConfig config) { + driverClassName = config.getJdbcDriverClass(); + url = config.getJdbcUrl(); + username = config.getJdbcUsername(); + password = config.getJdbcPassword(); + maxActive = config.getPoolMaxTotal(); + maxIdle = config.getPoolMaxIdle(); + minIdle = config.getPoolMinIdle(); + } +} diff --git a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/JdbcPushDownConnectionManager.java b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/JdbcPushDownConnectionManager.java index e4a625f837..b6f3e9528d 100644 --- a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/JdbcPushDownConnectionManager.java +++ b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/JdbcPushDownConnectionManager.java @@ -20,29 +20,43 @@ package org.apache.kylin.query.pushdown; import java.sql.Connection; import java.sql.SQLException; +import java.util.Map; import org.apache.commons.dbcp.BasicDataSource; import org.apache.kylin.common.KylinConfig; +import com.google.common.collect.Maps; + public class JdbcPushDownConnectionManager { - private volatile static JdbcPushDownConnectionManager manager = null; + private static final Map<String, JdbcPushDownConnectionManager> managerMap = Maps.newConcurrentMap(); - static JdbcPushDownConnectionManager getConnectionManager(KylinConfig config) throws ClassNotFoundException { - if (manager == null) { + static JdbcPushDownConnectionManager getConnectionManager(KylinConfig config, String project) + throws ClassNotFoundException { + JdbcPushDownConnectionManager manager = managerMap.get(project); + DataSourceConfig newDataSourceConfig = new DataSourceConfig(config); + if (needUpdateProjectConnectionManager(manager, newDataSourceConfig)) { synchronized (JdbcPushDownConnectionManager.class) { - if (manager == null) { - manager = new JdbcPushDownConnectionManager(config); + if (needUpdateProjectConnectionManager(manager, newDataSourceConfig)) { + manager = new JdbcPushDownConnectionManager(config, newDataSourceConfig); + managerMap.put(project, manager); } } } return manager; } + static boolean needUpdateProjectConnectionManager(JdbcPushDownConnectionManager manager, DataSourceConfig config) { + return manager == null || !manager.dataSourceConfig.equals(config); + } + private final BasicDataSource dataSource; + private final DataSourceConfig dataSourceConfig; - private JdbcPushDownConnectionManager(KylinConfig config) throws ClassNotFoundException { + private JdbcPushDownConnectionManager(KylinConfig config, DataSourceConfig newDataSourceConfig) + throws ClassNotFoundException { dataSource = new BasicDataSource(); + dataSourceConfig = newDataSourceConfig; Class.forName(config.getJdbcDriverClass()); dataSource.setDriverClassName(config.getJdbcDriverClass()); diff --git a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImpl.java b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImpl.java index c18a024d6f..4b53b720c0 100644 --- a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImpl.java +++ b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImpl.java @@ -37,17 +37,22 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { private JdbcPushDownConnectionManager manager = null; @Override - public void init(KylinConfig config) { + public void init(KylinConfig config, String project) { try { - manager = JdbcPushDownConnectionManager.getConnectionManager(config); + manager = JdbcPushDownConnectionManager.getConnectionManager(config, project); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } + @Override + public void init(KylinConfig config) { + init(config, ""); + } + @Override public void executeQuery(String query, List<List<String>> results, List<SelectedColumnMeta> columnMetas, - String project) throws Exception { + String project) throws Exception { Statement statement = null; Connection connection = manager.getConnection(); ResultSet resultSet = null; diff --git a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImpl.java b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImpl.java index be5bcfd908..81887d9c8f 100644 --- a/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImpl.java +++ b/src/spark-project/sparder/src/main/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImpl.java @@ -23,10 +23,10 @@ import java.util.List; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.metadata.query.StructField; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.source.adhocquery.IPushDownRunner; import org.apache.kylin.source.adhocquery.PushdownResult; -import org.apache.kylin.metadata.query.StructField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +38,11 @@ public class PushDownRunnerSparkImpl implements IPushDownRunner { // SparkSession has been initialized } + @Override + public void init(KylinConfig config, String project) { + init(config); + } + @Override public void executeQuery(String query, List<List<String>> results, List<SelectedColumnMeta> columnMetas, String project) { diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImplTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImplTest.java index 9c99102c50..27fd11e90d 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImplTest.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerJdbcImplTest.java @@ -18,26 +18,28 @@ package org.apache.kylin.query.pushdown; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import lombok.val; +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.LinkedHashMap; +import java.util.List; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.TempMetadataBuilder; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.LinkedHashMap; -import java.util.List; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.val; public class PushDownRunnerJdbcImplTest extends NLocalFileMetadataTestCase { @@ -122,11 +124,48 @@ public class PushDownRunnerJdbcImplTest extends NLocalFileMetadataTestCase { npr.updateProject(projectInstance); KylinConfigExt config = projectInstance.getConfig(); PushDownRunnerJdbcImpl pushDownRunnerJdbc = new PushDownRunnerJdbcImpl(); - pushDownRunnerJdbc.init(config); + pushDownRunnerJdbc.init(config, projectInstance.getName()); + String sql = "select 1"; + List<List<String>> returnRows = Lists.newArrayList(); + List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); + pushDownRunnerJdbc.executeQuery(sql, returnRows, returnColumnMeta, "default"); + Assert.assertEquals("1", returnRows.get(0).get(0)); + } + + @Test + public void testProjectPushDownJdbc() throws Exception { + NProjectManager npr = NProjectManager.getInstance(getTestConfig()); + ProjectInstance projectInstance = npr.getProject("default"); + projectInstance.setDefaultDatabase("SSB"); + LinkedHashMap<String, String> overrideKylinProps = Maps.newLinkedHashMap(); + overrideKylinProps.put("kylin.query.pushdown.jdbc.url", "jdbc:h2:mem:db_default"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.driver", "org.h2.Driver"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.username", "sa"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.password", ""); + projectInstance.setOverrideKylinProps(overrideKylinProps); + npr.updateProject(projectInstance); + KylinConfigExt config = projectInstance.getConfig(); + PushDownRunnerJdbcImpl pushDownRunnerJdbc = new PushDownRunnerJdbcImpl(); + pushDownRunnerJdbc.init(config, projectInstance.getName()); String sql = "select 1"; List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); pushDownRunnerJdbc.executeQuery(sql, returnRows, returnColumnMeta, "default"); Assert.assertEquals("1", returnRows.get(0).get(0)); + + ProjectInstance projectInstance2 = npr.getProject("demo"); + projectInstance2.setDefaultDatabase("SSB"); + overrideKylinProps = Maps.newLinkedHashMap(); + overrideKylinProps.put("kylin.query.pushdown.jdbc.url", "jdbc:h2:mem:db_default"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.driver", "org.h2.Driver"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.username", "sa"); + overrideKylinProps.put("kylin.query.pushdown.jdbc.password", ""); + projectInstance2.setOverrideKylinProps(overrideKylinProps); + npr.updateProject(projectInstance2); + config = projectInstance2.getConfig(); + pushDownRunnerJdbc = new PushDownRunnerJdbcImpl(); + pushDownRunnerJdbc.init(config, projectInstance2.getName()); + pushDownRunnerJdbc.executeQuery(sql, returnRows, returnColumnMeta, "demo"); + Assert.assertEquals("1", returnRows.get(0).get(0)); } } diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java index 46c8a8fa7c..5694f11483 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java @@ -23,8 +23,8 @@ import java.util.List; import java.util.UUID; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; @@ -70,7 +70,7 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { @Test public void testCast() { PushDownRunnerSparkImpl pushDownRunnerSpark = new PushDownRunnerSparkImpl(); - pushDownRunnerSpark.init(null); + pushDownRunnerSpark.init(null, "tpch"); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); @@ -108,7 +108,7 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { @Test public void testPushDownRunnerSpark() { PushDownRunnerSparkImpl pushDownRunnerSpark = new PushDownRunnerSparkImpl(); - pushDownRunnerSpark.init(null); + pushDownRunnerSpark.init(null, "tpch"); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); @@ -124,7 +124,7 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { @Test public void testPushDownRunnerSparkWithDotColumn() { PushDownRunnerSparkImpl pushDownRunnerSpark = new PushDownRunnerSparkImpl(); - pushDownRunnerSpark.init(null); + pushDownRunnerSpark.init(null, "tpch"); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); @@ -140,7 +140,7 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { @Test public void testSelectTwoSameExpr() { PushDownRunnerSparkImpl pushDownRunnerSpark = new PushDownRunnerSparkImpl(); - pushDownRunnerSpark.init(null); + pushDownRunnerSpark.init(null, "tpch"); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); @@ -156,7 +156,7 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { @Test public void testCaseSensitiveOnAlias() { PushDownRunnerSparkImpl pushDownRunnerSpark = new PushDownRunnerSparkImpl(); - pushDownRunnerSpark.init(null); + pushDownRunnerSpark.init(null, "tpch"); List<List<String>> returnRows = Lists.newArrayList(); List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList();