This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 555358303a KYLIN-5217 Create a initial commit
555358303a is described below

commit 555358303a1a685ce3b6ca970f7a9d5f5029eb50
Author: biaobiao.sun <1319027...@qq.com>
AuthorDate: Tue Aug 9 10:36:12 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../kap/engine/spark/job/EnviromentAdaptor.java    |  29 +
 .../kap/engine/spark/job/IJobProgressReport.java   |  37 ++
 .../kap/engine/spark/job/ISparkJobHandler.java     |  47 ++
 .../kap/engine/spark/job/ParamsConstants.java      |  29 +
 .../kap/engine/spark/job/SparkAppDescription.java  |  53 ++
 .../engine/spark/application/SparkApplication.java | 620 +++++++++++++++++++++
 .../kylin/engine/spark/application/SparkEntry.java |  27 +
 .../spark/source/NSparkCubingSourceInput.java      |  77 +++
 .../engine/spark/source/NSparkDataSource.java      |  79 +++
 .../spark/source/NSparkMetadataExplorer.java       | 313 +++++++++++
 .../kylin/engine/spark/source/NSparkTable.java     |  65 +++
 .../kylin/engine/spark/source/NSparkTableMeta.java | 111 ++++
 .../spark/source/NSparkTableMetaBuilder.java       | 140 +++++
 .../spark/source/NSparkTableMetaExplorer.java      | 203 +++++++
 .../engine/spark/source/NSparkTableReader.java     |  87 +++
 .../kylin/engine/spark/source/SparkSqlUtil.java    |  66 +++
 16 files changed, 1983 insertions(+)

diff --git 
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
new file mode 100644
index 0000000000..0301d20c91
--- /dev/null
+++ 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+
+import org.apache.spark.sql.SparkSession;
+
+public interface EnviromentAdaptor {
+
+
+    Boolean prepareEnviroment(SparkSession spark, Map<String, String> params);
+}
diff --git 
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
new file mode 100644
index 0000000000..9e5e58f20f
--- /dev/null
+++ 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+
+public interface IJobProgressReport {
+
+    boolean updateSparkJobInfo(Map<String, String> params, String url, String 
json);
+
+    boolean updateSparkJobExtraInfo(Map<String, String> params, String url, 
String project, String jobId,
+            Map<String, String> extraInfo);
+
+    default boolean executeFinish(Map<String, String> params, String project, 
String jobId) {
+        return true;
+    }
+
+    default void initArgsParams(Map<String, String> argsParams) {
+    }
+
+}
diff --git 
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
new file mode 100644
index 0000000000..c167b52597
--- /dev/null
+++ 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+
+public interface ISparkJobHandler {
+
+    void killOrphanApplicationIfExists(String project, String jobStepId, 
KylinConfig config,
+            Map<String, String> sparkConf);
+
+    void checkApplicationJar(KylinConfig config) throws ExecuteException;
+
+    String createArgsFileOnRemoteFileSystem(KylinConfig config, String 
project, String jobId,
+            Map<String, String> params) throws ExecuteException;
+
+    Object generateSparkCmd(KylinConfig config, SparkAppDescription desc);
+
+    default void modifyDump(Properties props) {
+    }
+
+    default void prepareEnviroment(String project, String jobStepId, 
Map<String, String> params) {
+    }
+
+    Map<String, String> runSparkSubmit(Object cmd, String parentId) throws 
ExecuteException;
+
+}
diff --git 
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
new file mode 100644
index 0000000000..e218d319c7
--- /dev/null
+++ 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+public class ParamsConstants {
+
+    private ParamsConstants() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static final String TIME_OUT = "time_out";
+    public static final String JOB_TMP_DIR = "job_tmp_dir";
+}
diff --git 
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
new file mode 100644
index 0000000000..8f105a5172
--- /dev/null
+++ 
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+
+@Data
+public class SparkAppDescription {
+
+    private String hadoopConfDir;
+
+    private String kylinJobJar;
+
+    private String appArgs;
+
+    private String jobNamePrefix;
+
+    private String project;
+
+    private String jobId;
+
+    private int stepId;
+
+    private String sparkSubmitClassName;
+
+    private Map<String, String> sparkConf;
+
+    private Set<String> sparkJars;
+
+    private Set<String> sparkFiles;
+
+    private String comma;
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
new file mode 100644
index 0000000000..cac224d44b
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.application;
+
+import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE;
+import static 
org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.cluster.IClusterManager;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.util.Application;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.TimeZoneUtils;
+import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.engine.spark.job.BuildJobInfos;
+import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
+import org.apache.kylin.engine.spark.job.ResourceDetect;
+import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.engine.spark.job.SegmentBuildJob;
+import org.apache.kylin.engine.spark.job.SparkJobConstants;
+import org.apache.kylin.engine.spark.job.UdfManager;
+import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
+import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
+import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.query.pushdown.SparkSubmitter;
+import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.spark.SparkConf;
+import org.apache.spark.application.NoRetryException;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.sql.KylinSession;
+import org.apache.spark.sql.KylinSession$;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SparkSessionExtensions;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.rules.Rule;
+import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
+import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import io.kyligence.kap.engine.spark.job.EnviromentAdaptor;
+import io.kyligence.kap.engine.spark.job.IJobProgressReport;
+import io.kyligence.kap.engine.spark.job.ParamsConstants;
+import lombok.val;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+public abstract class SparkApplication implements Application {
+    private static final Logger logger = 
LoggerFactory.getLogger(SparkApplication.class);
+    private Map<String, String> params = Maps.newHashMap();
+    public static final String JOB_NAME_PREFIX = "job_step_";
+    private IJobProgressReport report;
+
+    protected volatile KylinConfig config;
+    protected volatile String jobId;
+    protected String project;
+    protected int layoutSize = -1;
+    protected BuildJobInfos infos;
+    /**
+     * path for spark app args on HDFS
+     */
+    protected String path;
+
+    private ClusterMonitor clusterMonitor;
+    private final AtomicLong atomicDisconnectSparkMasterTimes = new 
AtomicLong(0);
+    private final AtomicBoolean atomicUnreachableSparkMaster = new 
AtomicBoolean(false);
+    private final AtomicReference<SparkConf> atomicSparkConf = new 
AtomicReference<>(null);
+    private final AtomicReference<SparkSession> atomicSparkSession = new 
AtomicReference<>(null);
+    private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new 
AtomicReference<>(null);
+
+    public void execute(String[] args) {
+        try {
+            path = args[0];
+            String argsLine = readArgsFromHDFS();
+            params = JsonUtil.readValueAsMap(argsLine);
+            logger.info("Execute {} with args : {}", 
this.getClass().getName(), argsLine);
+            execute();
+        } catch (Exception e) {
+            throw new RuntimeException("Error execute " + 
this.getClass().getName(), e);
+        }
+    }
+
+    public AtomicBoolean getAtomicUnreachableSparkMaster() {
+        return atomicUnreachableSparkMaster;
+    }
+
+    public final Map<String, String> getParams() {
+        return this.params;
+    }
+
+    public final String getParam(String key) {
+        return this.params.get(key);
+    }
+
+    public final void setParam(String key, String value) {
+        this.params.put(key, value);
+    }
+
+    public final boolean contains(String key) {
+        return params.containsKey(key);
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public String getProject() {
+        return project;
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public IJobProgressReport getReport() {
+        if (report == null)
+            return new RestfulJobProgressReport();
+        return report;
+    }
+
+    /// backwards compatibility, must have been initialized before invoking 
#doExecute.
+    protected SparkSession ss;
+
+    public SparkSession getSparkSession() throws NoRetryException {
+        SparkSession sparkSession = atomicSparkSession.get();
+        if (Objects.isNull(sparkSession)) {
+            // shouldn't reach here
+            throw new NoRetryException("spark session shouldn't be null");
+        }
+        return sparkSession;
+    }
+
+    public String readArgsFromHDFS() {
+        val fs = HadoopUtil.getFileSystem(path);
+        String argsLine = null;
+        Path filePath = new Path(path);
+        try (FSDataInputStream inputStream = fs.open(filePath)) {
+            BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream, Charset.defaultCharset()));
+            argsLine = br.readLine();
+        } catch (IOException e) {
+            logger.error("Error occurred when reading args file: {}", path, e);
+        }
+        return argsLine;
+    }
+
+    /**
+     * get tracking url by application id
+     *
+     * @param sparkSession build sparkSession
+     * @return
+     */
+    public String getTrackingUrl(IClusterManager clusterManager, SparkSession 
sparkSession) {
+        return clusterManager.getBuildTrackingUrl(sparkSession);
+    }
+
+    private String tryReplaceHostAddress(String url) {
+        String originHost = null;
+        try {
+            URI uri = URI.create(url);
+            originHost = uri.getHost();
+            String hostAddress = 
InetAddress.getByName(originHost).getHostAddress();
+            return url.replace(originHost, hostAddress);
+        } catch (UnknownHostException uhe) {
+            logger.error("failed to get the ip address of {}, step back to use 
the origin tracking url.", originHost,
+                    uhe);
+            return url;
+        }
+    }
+
+    private Map<String, String> getTrackingInfo(SparkSession sparkSession, 
boolean ipAddressPreferred) {
+        IClusterManager clusterManager = atomicBuildEnv.get().clusterManager();
+        String applicationId = sparkSession.sparkContext().applicationId();
+        Map<String, String> extraInfo = new HashMap<>();
+        extraInfo.put("yarn_app_id", applicationId);
+        try {
+            String trackingUrl = getTrackingUrl(clusterManager, sparkSession);
+            if (StringUtils.isBlank(trackingUrl)) {
+                logger.warn("Get tracking url of application {}, but empty url 
found.", applicationId);
+                return extraInfo;
+            }
+            if (ipAddressPreferred) {
+                trackingUrl = tryReplaceHostAddress(trackingUrl);
+            }
+            extraInfo.put("yarn_app_url", trackingUrl);
+        } catch (Exception e) {
+            logger.error("get tracking url failed!", e);
+        }
+        return extraInfo;
+    }
+
+    protected void exchangeSparkSession() {
+        exchangeSparkSession(atomicSparkConf.get());
+    }
+
+    protected final void execute() throws Exception {
+        String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
+        jobId = getParam(NBatchConstants.P_JOB_ID);
+        project = getParam(NBatchConstants.P_PROJECT_NAME);
+        if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
+            layoutSize = 
StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
+        }
+        try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = 
KylinConfig
+                
.setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl)))
 {
+            config = autoCloseConfig.get();
+            report = (IJobProgressReport) 
ClassUtil.newInstance(config.getBuildJobProgressReporter());
+            report.initArgsParams(getParams());
+            //// KylinBuildEnv
+            final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config);
+            atomicBuildEnv.set(buildEnv);
+            infos = buildEnv.buildJobInfos();
+            infos.recordJobId(jobId);
+            infos.recordProject(project);
+            
infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId));
+
+            monitorSparkMaster();
+
+            HadoopUtil.setCurrentConfiguration(new Configuration());
+            ////////
+            exchangeSparkConf(buildEnv.sparkConf());
+
+            TimeZoneUtils.setDefaultTimeZone(config);
+
+            /// wait until resource is enough
+            waiteForResource(atomicSparkConf.get(), buildEnv);
+
+            ///
+            logger.info("Prepare job environment");
+            prepareSparkSession();
+
+            /// backwards compatibility
+            ss = getSparkSession();
+            val master = ss.conf().get(SparkLauncher.SPARK_MASTER, "");
+            if (!master.equals("local")) {
+                EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil
+                        .newInstance(config.getBuildJobEnviromentAdaptor());
+                adaptor.prepareEnviroment(ss, params);
+            }
+
+            if (config.useDynamicS3RoleCredentialInTable()) {
+                val tableMetadataManager = 
NTableMetadataManager.getInstance(config, project);
+                tableMetadataManager.listAllTables().forEach(tableDesc -> 
SparderEnv
+                        
.addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc),
 ss));
+            }
+
+            if (!config.isUTEnv()) {
+                Unsafe.setProperty("kylin.env", config.getDeployEnv());
+            }
+            logger.info("Start job");
+            infos.startJob();
+            // should be invoked after method prepareSparkSession
+            extraInit();
+
+            waiteForResourceSuccess();
+            doExecute();
+            // Output metadata to another folder
+            val resourceStore = ResourceStore.getKylinMetaStore(config);
+            val outputConfig = KylinConfig.createKylinConfig(config);
+            
outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL));
+            
MetadataStore.createMetadataStore(outputConfig).dump(resourceStore);
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            if (infos != null) {
+                infos.jobEnd();
+            }
+            destroySparkSession();
+            extraDestroy();
+            executeFinish();
+        }
+    }
+
+    protected void handleException(Exception e) throws Exception {
+        throw e;
+    }
+
+    private SparkSession createSpark(SparkConf sparkConf) {
+        SparkSession.Builder sessionBuilder = SparkSession.builder()
+                .withExtensions(new AbstractFunction1<SparkSessionExtensions, 
BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(SparkSessionExtensions v1) {
+                        v1.injectPostHocResolutionRule(new 
AbstractFunction1<SparkSession, Rule<LogicalPlan>>() {
+                            @Override
+                            public Rule<LogicalPlan> apply(SparkSession 
session) {
+                                return new AlignmentTableStats(session);
+                            }
+                        });
+                        return BoxedUnit.UNIT;
+                    }
+                }).enableHiveSupport().config(sparkConf)
+                .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
"false");
+
+        // If this is UT and SparkSession is already created, then use 
SparkSession.
+        // Otherwise, we always use KylinSession
+        boolean createWithSparkSession = !isJobOnCluster(sparkConf) && 
SparderEnv.isSparkAvailable();
+        if (createWithSparkSession) {
+            boolean isKylinSession = SparderEnv.getSparkSession() instanceof 
KylinSession;
+            createWithSparkSession = !isKylinSession;
+        }
+
+        if (createWithSparkSession) {
+            return sessionBuilder.getOrCreate();
+        } else {
+            return 
KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession();
+        }
+    }
+
+    public boolean isJobOnCluster(SparkConf conf) {
+        return !Utils.isLocalMaster(conf) && !config.isUTEnv();
+    }
+
+    protected void extraInit() {
+    }
+
+    public void extraDestroy() {
+        if (clusterMonitor != null) {
+            clusterMonitor.shutdown();
+        }
+    }
+
+    protected abstract void doExecute() throws Exception;
+
+    protected void onLayoutFinished(long layoutId) {
+        //do nothing
+    }
+
+    protected void onExecuteFinished() {
+        //do nothing
+    }
+
+    protected String calculateRequiredCores() throws Exception {
+        return SparkJobConstants.DEFAULT_REQUIRED_CORES;
+    }
+
+    private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
+        SparkConfHelper helper = new SparkConfHelper();
+        // copy user defined spark conf
+        if (sparkConf.getAll() != null) {
+            Arrays.stream(sparkConf.getAll()).forEach(config -> 
helper.setConf(config._1, config._2));
+        }
+        helper.setClusterManager(KylinBuildEnv.get().clusterManager());
+
+        chooseContentSize(helper);
+
+        helper.setOption(SparkConfHelper.LAYOUT_SIZE, 
Integer.toString(layoutSize));
+        helper.setOption(SparkConfHelper.REQUIRED_CORES, 
calculateRequiredCores());
+        helper.setConf(COUNT_DISTICT, hasCountDistinct().toString());
+        helper.generateSparkConf();
+        helper.applySparkConf(sparkConf);
+    }
+
+    private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) 
throws Exception {
+        val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
+        infos.recordStageId(waiteForResource.getId());
+        waiteForResource.execute();
+    }
+
+    protected void waiteForResourceSuccess() throws Exception {
+        val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
+        waiteForResource.onStageFinished(true);
+        infos.recordStageId("");
+    }
+
+    protected void executeFinish() {
+        try {
+            getReport().executeFinish(getReportParams(), project, getJobId());
+        } catch (Exception e) {
+            logger.error("executeFinish failed", e);
+        }
+    }
+
+    protected void chooseContentSize(SparkConfHelper helper) {
+        Path shareDir = config.getJobTmpShareDir(project, jobId);
+        // add content size with unit
+        helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, 
chooseContentSize(shareDir));
+    }
+
+    protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) {
+        return modelDesc.getAllTableRefs().stream().anyMatch(p -> 
p.getTableDesc().isRangePartition());
+    }
+
+    protected String chooseContentSize(Path shareDir) {
+        // return size with unit
+        return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b";
+    }
+
+    protected Boolean hasCountDistinct() throws IOException {
+        Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId),
+                ResourceDetectUtils.countDistinctSuffix());
+        FileSystem fileSystem = 
countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration());
+        Boolean exist;
+        if (fileSystem.exists(countDistinct)) {
+            exist = ResourceDetectUtils.readResourcePathsAs(countDistinct);
+        } else {
+            exist = false;
+            logger.debug("File count_distinct.json doesn't exist, set 
hasCountDistinct to false.");
+        }
+        logger.debug("Exist count distinct measure: {}", exist);
+        return exist;
+    }
+
+    public void logJobInfo() {
+        try {
+            logger.info(generateInfo());
+            if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) 
{
+                logger.info("skip record job wait and run time");
+                return;
+            }
+            Map<String, String> extraInfo = new HashMap<>();
+            extraInfo.put("yarn_job_wait_time", ((Long) 
KylinBuildEnv.get().buildJobInfos().waitTime()).toString());
+            extraInfo.put("yarn_job_run_time", ((Long) 
KylinBuildEnv.get().buildJobInfos().buildTime()).toString());
+
+            getReport().updateSparkJobExtraInfo(getReportParams(), 
"/kylin/api/jobs/wait_and_run_time", project, jobId,
+                    extraInfo);
+        } catch (Exception e) {
+            logger.warn("Error occurred when generate job info.", e);
+        }
+    }
+
+    private Map<String, String> getReportParams() {
+        val reportParams = new HashMap<String, String>();
+        reportParams.put(ParamsConstants.TIME_OUT, 
String.valueOf(config.getUpdateJobInfoTimeout()));
+        reportParams.put(ParamsConstants.JOB_TMP_DIR, 
config.getJobTmpDir(project, true));
+        return reportParams;
+    }
+
+    protected String generateInfo() {
+        return LogJobInfoUtils.sparkApplicationInfo();
+    }
+
+    public Set<String> getIgnoredSnapshotTables() {
+        return 
NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES));
+    }
+
+    protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
+        return config.getSparkConfigOverride();
+    }
+
+    protected void checkDateFormatIfExist(String project, String modelId) 
throws Exception {
+        if (config.isUTEnv()) {
+            return;
+        }
+        val modelManager = NDataModelManager.getInstance(config, project);
+        NDataModel modelDesc = modelManager.getDataModelDesc(modelId);
+        if (checkRangePartitionTableIsExist(modelDesc)) {
+            logger.info("Range partitioned tables do not support pushdown, so 
do not need to perform subsequent logic");
+            return;
+        }
+
+        val partitionDesc = modelDesc.getPartitionDesc();
+        if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)
+                || 
org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat()))
+            return;
+
+        if 
(CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType()))
+            return;
+
+        String partitionColumn = 
modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB();
+
+        SparkSession sparkSession = atomicSparkSession.get();
+        try (SparkSubmitter.OverriddenSparkSession ignored = 
SparkSubmitter.getInstance()
+                .overrideSparkSession(sparkSession)) {
+            String dateString = 
PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(), 
partitionColumn,
+                    project);
+            val sdf = new 
SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(),
+                    Locale.getDefault(Locale.Category.FORMAT));
+            val date = sdf.parse(dateString);
+            if (date == null || !dateString.equals(sdf.format(date))) {
+                throw new NoRetryException("date format not match");
+            }
+        } catch (KylinException ignore) {
+            // ignore it when pushdown return empty row
+        } catch (ParseException | NoRetryException e) {
+            throw new NoRetryException("date format not match");
+        }
+    }
+
+    private void exchangeSparkConf(SparkConf sparkConf) throws Exception {
+        if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
+            Map<String, String> baseSparkConf = getSparkConfigOverride(config);
+            if (!baseSparkConf.isEmpty()) {
+                baseSparkConf.forEach(sparkConf::set);
+                String baseSparkConfStr = 
JsonUtil.writeValueAsString(baseSparkConf);
+                logger.info("Override user-defined spark conf: {}", 
baseSparkConfStr);
+            }
+            if (config.isAutoSetSparkConf()) {
+                logger.info("Set spark conf automatically.");
+                try {
+                    autoSetSparkConf(sparkConf);
+                } catch (Exception e) {
+                    logger.warn("Auto set spark conf failed. Load spark conf 
from system properties", e);
+                }
+            }
+        }
+
+        atomicSparkConf.set(sparkConf);
+    }
+
+    private void exchangeSparkSession(SparkConf sparkConf) {
+        SparkSession sparkSession = atomicSparkSession.get();
+        if (Objects.nonNull(sparkSession)) {
+            // destroy previous spark session
+            destroySparkSession();
+        }
+
+        sparkSession = createSpark(sparkConf);
+        if (!config.isUTEnv() && 
!sparkConf.get("spark.master").startsWith("k8s")) {
+            getReport().updateSparkJobExtraInfo(getReportParams(), 
"/kylin/api/jobs/spark", project, jobId,
+                    getTrackingInfo(sparkSession, 
config.isTrackingUrlIpAddressEnabled()));
+        }
+
+        // for spark metrics
+        JobMetricsUtils.registerListener(sparkSession);
+        SparderEnv.registerListener(sparkSession.sparkContext());
+
+        //#8341
+        SparderEnv.setSparkSession(sparkSession);
+        UdfManager.create(sparkSession);
+
+        ///
+        atomicSparkSession.set(sparkSession);
+    }
+
+    private void prepareSparkSession() throws NoRetryException {
+        SparkConf sparkConf = atomicSparkConf.get();
+        if (Objects.isNull(sparkConf)) {
+            // shouldn't reach here
+            throw new NoRetryException("spark conf shouldn't be null");
+        }
+
+        /// SegmentBuildJob only!!!
+        if (config.isSnapshotSpecifiedSparkConf() && (this instanceof 
SegmentBuildJob)) {
+            // snapshot specified spark conf, based on the exchanged spark 
conf.
+            SparkConf clonedSparkConf = sparkConf.clone();
+            Map<String, String> snapshotSparkConf = 
config.getSnapshotBuildingConfigOverride();
+            snapshotSparkConf.forEach(clonedSparkConf::set);
+            logger.info("exchange sparkSession using snapshot specified 
sparkConf");
+            exchangeSparkSession(clonedSparkConf);
+            return;
+        }
+        // normal logic
+        exchangeSparkSession(sparkConf);
+    }
+
+    private void destroySparkSession() {
+        SparkSession sparkSession = atomicSparkSession.get();
+        if (Objects.isNull(sparkSession)) {
+            logger.info("no initialized sparkSession instance");
+            return;
+        }
+        if (sparkSession.conf().get("spark.master").startsWith("local")) {
+            // for UT use? but very strange for resource detect mode (spark 
local).
+            return;
+        }
+        JobMetricsUtils.unRegisterListener(sparkSession);
+        sparkSession.stop();
+    }
+
+    private void monitorSparkMaster() {
+        clusterMonitor = new ClusterMonitor();
+        clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession, 
atomicDisconnectSparkMasterTimes,
+                atomicUnreachableSparkMaster);
+    }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
new file mode 100644
index 0000000000..31974f65bc
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.application;
+
+import org.apache.spark.application.JobWorkSpace;
+
+public class SparkEntry {
+    public static void main(String[] args) {
+        JobWorkSpace.execute(args);
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
new file mode 100644
index 0000000000..5411c31bbd
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import static 
org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.SparderTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class NSparkCubingSourceInput implements 
NSparkCubingEngine.NSparkCubingSource {
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkCubingSourceInput.class);
+
+    @Override
+    public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, 
Map<String, String> params) {
+        ColumnDesc[] columnDescs = table.getColumns();
+        List<String> tblColNames = 
Lists.newArrayListWithCapacity(columnDescs.length);
+        StructType kylinSchema = new StructType();
+        for (ColumnDesc columnDesc : columnDescs) {
+            if (!columnDesc.isComputedColumn()) {
+                kylinSchema = kylinSchema.add(columnDesc.getName(),
+                        SparderTypeUtil.toSparkType(columnDesc.getType(), 
false), true);
+                tblColNames.add("`" + columnDesc.getName() + "`");
+            }
+        }
+        String[] colNames = tblColNames.toArray(new String[0]);
+        String colString = Joiner.on(",").join(colNames);
+        String sql;
+        KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig();
+        
logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}",
+                table.isRangePartition(), table.isTransactional(), 
kylinConfig.isReadTransactionalTableEnabled());
+        if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), 
table.isTransactional(),
+                kylinConfig.isReadTransactionalTableEnabled())) {
+            sql = 
HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, 
colString,
+                    KylinBuildEnv.get());
+        } else {
+            sql = String.format(Locale.ROOT, "select %s from %s", colString, 
table.getIdentity());
+        }
+        Dataset<Row> df = ss.sql(sql);
+        StructType sparkSchema = df.schema();
+        logger.debug("Source data sql is: {}", sql);
+        logger.debug("Kylin schema: {}", kylinSchema.treeString());
+        return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, 
kylinSchema));
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
new file mode 100644
index 0000000000..6a501bc71e
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+
+public class NSparkDataSource implements ISource {
+    // for reflection
+    public NSparkDataSource(KylinConfig config) {
+
+    }
+
+    @Override
+    public ISourceMetadataExplorer getSourceMetadataExplorer() {
+        return new NSparkMetadataExplorer();
+    }
+
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) {
+            return (I) new NSparkCubingSourceInput();
+        } else {
+            throw new IllegalArgumentException("Unsupported engine interface: 
" + engineInterface);
+        }
+    }
+
+    @Override
+    public IReadableTable createReadableTable(TableDesc tableDesc) {
+        return new NSparkTable(tableDesc);
+    }
+
+    @Override
+    public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable, 
SegmentRange srcPartition) {
+        return srcPartition;
+    }
+
+    @Override
+    public ISampleDataDeployer getSampleDataDeployer() {
+        return new NSparkMetadataExplorer();
+    }
+
+    @Override
+    public SegmentRange getSegmentRange(String start, String end) {
+        start = StringUtils.isEmpty(start) ? "0" : start;
+        end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end;
+        return new 
SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start), 
Long.parseLong(end));
+    }
+
+    @Override
+    public boolean supportBuildSnapShotByPartition() {
+        return true;
+    }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
new file mode 100644
index 0000000000..7e4f7deea0
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Database;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.internal.SQLConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Sets;
+
+import lombok.val;
+
+public class NSparkMetadataExplorer implements ISourceMetadataExplorer, 
ISampleDataDeployer, Serializable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkMetadataExplorer.class);
+
+    public static String generateCreateSchemaSql(String schemaName) {
+        return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", 
schemaName);
+    }
+
+    public static String[] generateCreateTableSql(TableDesc tableDesc) {
+        String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
+
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + col.getDatatype() + "\n");
+        }
+
+        ddl.append(")" + "\n");
+        ddl.append("USING com.databricks.spark.csv");
+
+        return new String[] { dropSql, ddl.toString() };
+    }
+
+    public NSparkTableMetaExplorer getTableMetaExplorer() {
+        return new NSparkTableMetaExplorer();
+    }
+
+    @Override
+    public List<String> listDatabases() throws Exception {
+        Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show 
databases").select("namespace");
+        return dataset.collectAsList().stream().map(row -> 
row.getString(0)).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<String> listTables(String database) throws Exception {
+        val ugi = UserGroupInformation.getCurrentUser();
+        val config = KylinConfig.getInstanceFromEnv();
+        val spark = SparderEnv.getSparkSession();
+
+        List<String> tables = Lists.newArrayList();
+        try {
+            String sql = "show tables";
+            if (StringUtils.isNotBlank(database)) {
+                sql = String.format(Locale.ROOT, sql + " in %s", database);
+            }
+            Dataset<Row> dataset = 
SparderEnv.getSparkSession().sql(sql).select("tableName");
+            tables = dataset.collectAsList().stream().map(row -> 
row.getString(0)).collect(Collectors.toList());
+
+            if (config.getTableAccessFilterEnable() && 
config.getKerberosProjectLevelEnable()
+                    && UserGroupInformation.isSecurityEnabled()) {
+                List<String> accessTables = Lists.newArrayList();
+                for (String table : tables) {
+                    val tableName = database + "." + table;
+                    if (checkTableAccess(tableName)) {
+                        accessTables.add(table);
+                    }
+                }
+                return accessTables;
+            }
+        } catch (Exception e) {
+            logger.error("List hive tables failed. user: {}, db: {}", 
ugi.getUserName(), database);
+        }
+
+        return tables;
+    }
+
+    public boolean checkTableAccess(String tableName) {
+        boolean isAccess = true;
+        try {
+            val spark = SparderEnv.getSparkSession();
+            val sparkTable = spark.catalog().getTable(tableName);
+            Set<String> needCheckTables = Sets.newHashSet();
+            if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) 
{
+                needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, 
SparderEnv.getSparkSession());
+            } else {
+                needCheckTables.add(tableName);
+            }
+            String hiveSpecFsLocation = 
spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
+            FileSystem fs = null == hiveSpecFsLocation ? 
HadoopUtil.getWorkingFileSystem()
+                    : HadoopUtil.getFileSystem(hiveSpecFsLocation);
+            for (String table : needCheckTables) {
+                fs.listStatus(new Path(getLoc(spark, table, 
hiveSpecFsLocation)));
+            }
+        } catch (Exception e) {
+            isAccess = false;
+            try {
+                logger.error("Read hive table {} error:{}, ugi name: {}.", 
tableName, e.getMessage(),
+                        UserGroupInformation.getCurrentUser().getUserName());
+            } catch (IOException ex) {
+                logger.error("fetch user curr ugi info error.", e);
+            }
+        }
+        return isAccess;
+    }
+
+    @Override
+    public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String 
database, String tableName, String prj)
+            throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager metaMgr = 
NTableMetadataManager.getInstance(config, prj);
+
+        NSparkTableMeta tableMeta = 
getTableMetaExplorer().getSparkTableMeta(database, tableName);
+        TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+
+        // make a new TableDesc instance, don't modify the one in use
+        if (tableDesc == null) {
+            tableDesc = new TableDesc();
+            tableDesc.setDatabase(database.toUpperCase(Locale.ROOT));
+            tableDesc.setName(tableName.toUpperCase(Locale.ROOT));
+            tableDesc.setUuid(RandomUtil.randomUUIDStr());
+            tableDesc.setLastModified(0);
+        } else {
+            tableDesc = new TableDesc(tableDesc);
+        }
+
+        if (tableMeta.tableType != null) {
+            tableDesc.setTableType(tableMeta.tableType);
+        }
+        //set table type = spark
+        tableDesc.setSourceType(ISourceAware.ID_SPARK);
+        tableDesc.setTransactional(tableMeta.isTransactional);
+        tableDesc.setRangePartition(tableMeta.isRangePartition);
+
+        Set<String> partColumnSet = 
Optional.ofNullable(tableMeta.partitionColumns) //
+                .orElseGet(Collections::emptyList).stream().map(field -> 
field.name) //
+                .collect(Collectors.toSet());
+        int columnNumber = tableMeta.allColumns.size();
+        List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+        for (int i = 0; i < columnNumber; i++) {
+            NSparkTableMeta.SparkTableColumnMeta field = 
tableMeta.allColumns.get(i);
+            ColumnDesc cdesc = new ColumnDesc();
+            cdesc.setName(field.name.toUpperCase(Locale.ROOT));
+            cdesc.setCaseSensitiveName(field.name);
+            // use "double" in kylin for "float"
+            if ("float".equalsIgnoreCase(field.dataType)) {
+                cdesc.setDatatype("double");
+            } else {
+                cdesc.setDatatype(field.dataType);
+            }
+            cdesc.setId(String.valueOf(i + 1));
+            cdesc.setComment(field.comment);
+            cdesc.setPartitioned(partColumnSet.contains(field.name));
+            columns.add(cdesc);
+        }
+        tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+        List<String> partCols = tableMeta.partitionColumns.stream().map(col -> 
col.name).collect(Collectors.toList());
+        if (!partCols.isEmpty()) {
+            
tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT));
+        } else {
+            tableDesc.setPartitionColumn(null);
+        }
+        StringBuilder partitionColumnBuilder = new StringBuilder();
+        for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) {
+            if (i > 0)
+                partitionColumnBuilder.append(", ");
+            
partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT));
+        }
+
+        TableExtDesc tableExtDesc = new TableExtDesc();
+        tableExtDesc.setIdentity(tableDesc.getIdentity());
+        tableExtDesc.setUuid(RandomUtil.randomUUIDStr());
+        tableExtDesc.setLastModified(0);
+        tableExtDesc.init(prj);
+
+        tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY, 
tableMeta.sdLocation);
+        tableExtDesc.addDataSourceProp("owner", tableMeta.owner);
+        tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime);
+        tableExtDesc.addDataSourceProp("last_access_time", 
tableMeta.lastAccessTime);
+        tableExtDesc.addDataSourceProp("partition_column", 
partitionColumnBuilder.toString());
+        tableExtDesc.addDataSourceProp("total_file_size", 
String.valueOf(tableMeta.fileSize));
+        tableExtDesc.addDataSourceProp("total_file_number", 
String.valueOf(tableMeta.fileNum));
+        tableExtDesc.addDataSourceProp("hive_inputFormat", 
tableMeta.sdInputFormat);
+        tableExtDesc.addDataSourceProp("hive_outputFormat", 
tableMeta.sdOutputFormat);
+        tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY, 
tableMeta.s3Role);
+        tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY, 
tableMeta.s3Endpoint);
+        return Pair.newPair(tableDesc, tableExtDesc);
+    }
+
+    @Override
+    public List<String> getRelatedKylinResources(TableDesc table) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean checkDatabaseAccess(String database) throws Exception {
+        boolean hiveDBAccessFilterEnable = 
KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
+        if (hiveDBAccessFilterEnable) {
+            logger.info("Check database {} access start.", database);
+            try {
+                Database db = 
SparderEnv.getSparkSession().catalog().getDatabase(database);
+            } catch (AnalysisException e) {
+                UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+                logger.info("The current user: {} does not have permission to 
access database {}", ugi.getUserName(),
+                        database);
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean checkTablesAccess(Set<String> tables) {
+        return tables.stream().allMatch(this::checkTableAccess);
+    }
+
+    @Override
+    public Set<String> getTablePartitions(String database, String table, 
String prj, String partCol) {
+        return getTableMetaExplorer().checkAndGetTablePartitions(database, 
table, partCol);
+    }
+
+    @Override
+    public void createSampleDatabase(String database) throws Exception {
+        SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database));
+    }
+
+    @Override
+    public void createSampleTable(TableDesc table) throws Exception {
+        String[] createTableSqls = generateCreateTableSql(table);
+        for (String sql : createTableSqls) {
+            SparderEnv.getSparkSession().sql(sql);
+        }
+    }
+
+    @Override
+    public void loadSampleData(String tableName, String tableFileDir) throws 
Exception {
+        Dataset<Row> dataset = 
SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + 
".csv").toDF();
+        if (tableName.indexOf(".") > 0) {
+            tableName = tableName.substring(tableName.indexOf(".") + 1);
+        }
+        dataset.createOrReplaceTempView(tableName);
+    }
+
+    @Override
+    public void createWrapperView(String origTableName, String viewName) 
throws Exception {
+        throw new UnsupportedOperationException("unsupport create wrapper 
view");
+    }
+
+    public String getLoc(SparkSession spark, String table, String 
hiveSpecFsLocation) {
+        String loc = spark.sql("desc formatted " + table).where("col_name == 
'Location'").head().getString(1);
+        if (null == hiveSpecFsLocation || null == loc) {
+            return loc;
+        }
+        return loc.replace("hdfs://hacluster", hiveSpecFsLocation);
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
new file mode 100644
index 0000000000..6ab6643154
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NSparkTable implements IReadableTable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkTable.class);
+
+    final private String database;
+    final private String tableName;
+
+    public NSparkTable(TableDesc tableDesc) {
+        this.database = tableDesc.getDatabase();
+        this.tableName = tableDesc.getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new NSparkTableReader(database, tableName);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        // TODO: 07/12/2017 get modify time
+        String path = String.format(Locale.ROOT, "%s.%s", database, tableName);
+        long lastModified = System.currentTimeMillis(); // assume table is 
ever changing
+        int size = 0;
+        return new TableSignature(path, size, lastModified);
+    }
+
+    @Override
+    public boolean exists() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "spark: database=[" + database + "], table=[" + tableName + "]";
+    }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
new file mode 100644
index 0000000000..29f48bfbfc
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+
+public class NSparkTableMeta {
+    public static class SparkTableColumnMeta {
+        String name;
+        String dataType;
+        String comment;
+
+        public SparkTableColumnMeta(String name, String dataType, String 
comment) {
+            this.name = name;
+            this.dataType = dataType;
+            this.comment = comment;
+        }
+
+        @Override
+        public String toString() {
+            return "SparkTableColumnMeta{" + "name='" + name + '\'' + ", 
dataType='" + dataType + '\'' + ", comment='"
+                    + comment + '\'' + '}';
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getDataType() {
+            return dataType;
+        }
+
+        public String getComment() {
+            return comment;
+        }
+    }
+
+    String tableName;
+    String sdLocation;//sd is short for storage descriptor
+    String sdInputFormat;
+    String sdOutputFormat;
+    String owner;
+    String provider;
+    String tableType;
+    String createTime;
+    String lastAccessTime;
+    long fileSize;
+    long fileNum;
+    boolean isNative;
+    List<SparkTableColumnMeta> allColumns;
+    List<SparkTableColumnMeta> partitionColumns;
+    boolean isTransactional;
+    boolean isRangePartition;
+    String s3Role;
+    String s3Endpoint;
+
+    public List<SparkTableColumnMeta> getAllColumns() {
+        return allColumns;
+    }
+
+    public NSparkTableMeta(String tableName, String sdLocation, String 
sdInputFormat, String sdOutputFormat,
+            String owner, String provider, String tableType, String 
createTime, String lastAccessTime, long fileSize,
+            long fileNum, boolean isNative, List<SparkTableColumnMeta> 
allColumns,
+            List<SparkTableColumnMeta> partitionColumns, boolean 
isTransactional, boolean isRangePartition,
+            String s3Role, String s3Endpoint) {
+        this.tableName = tableName;
+        this.sdLocation = sdLocation;
+        this.sdInputFormat = sdInputFormat;
+        this.sdOutputFormat = sdOutputFormat;
+        this.owner = owner;
+        this.provider = provider;
+        this.tableType = tableType;
+        this.createTime = createTime;
+        this.lastAccessTime = lastAccessTime;
+        this.fileSize = fileSize;
+        this.fileNum = fileNum;
+        this.isNative = isNative;
+        this.allColumns = allColumns;
+        this.partitionColumns = partitionColumns;
+        this.isTransactional = isTransactional;
+        this.isRangePartition = isRangePartition;
+        this.s3Role = s3Role;
+        this.s3Endpoint = s3Endpoint;
+    }
+
+    @Override
+    public String toString() {
+        return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ", 
sdLocation='" + sdLocation + '\''
+                + ", sdInputFormat='" + sdInputFormat + '\'' + ", 
sdOutputFormat='" + sdOutputFormat + '\''
+                + ", owner='" + owner + ", provider='" + provider + '\'' + ", 
tableType='" + tableType
+                + ", createTime='" + createTime + '\'' + ", lastAccessTime=" + 
lastAccessTime + ", fileSize=" + fileSize
+                + ", fileNum=" + fileNum + ", isNative=" + isNative + ", 
allColumns=" + allColumns
+                + ", partitionColumns=" + partitionColumns + ", 
isTransactional=" + isTransactional
+                + ", isRangePartition=" + isRangePartition + ", s3Role=" + 
s3Role + ", s3Endpoint=" + s3Endpoint + '}';
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
new file mode 100644
index 0000000000..2d4a9eeae1
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class NSparkTableMetaBuilder {
+    private String tableName;
+    private String sdLocation;
+    private String sdInputFormat;
+    private String sdOutputFormat;
+    private String owner;
+    private String provider;
+    private String tableType;
+    private String createTime;
+    private String lastAccessTime;
+    private long fileSize;
+    private long fileNum;
+    private boolean isNative = true;
+    private List<NSparkTableMeta.SparkTableColumnMeta> allColumns = 
Lists.newArrayList();
+    private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns = 
Lists.newArrayList();
+    private boolean isTransactional = false;
+    private boolean isRangePartition = false;
+    private String s3Role;
+    private String s3Endpoint;
+
+    public NSparkTableMetaBuilder setTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setSdLocation(String sdLocation) {
+        this.sdLocation = sdLocation;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) {
+        this.sdInputFormat = sdInputFormat;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) {
+        this.sdOutputFormat = sdOutputFormat;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setOwner(String owner) {
+        this.owner = owner;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setProvider(String provider) {
+        this.provider = provider;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setTableType(String tableType) {
+        this.tableType = tableType;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setCreateTime(String createTime) {
+        this.createTime = createTime;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) {
+        this.lastAccessTime = lastAccessTime;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setFileSize(long fileSize) {
+        this.fileSize = fileSize;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setFileNum(long fileNum) {
+        this.fileNum = fileNum;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setIsNative(boolean isNative) {
+        this.isNative = isNative;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder 
setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) {
+        this.allColumns = allColumns;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder 
setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta> 
partitionColumns) {
+        this.partitionColumns = partitionColumns;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) {
+        this.isTransactional = isTransactional;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setIsRangePartition(boolean 
isRangePartition) {
+        this.isRangePartition = isRangePartition;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setS3Role(String s3Role) {
+        this.s3Role = s3Role;
+        return this;
+    }
+
+    public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) {
+        this.s3Endpoint = s3Endpoint;
+        return this;
+    }
+
+    public NSparkTableMeta createSparkTableMeta() {
+        return new NSparkTableMeta(tableName, sdLocation, sdInputFormat, 
sdOutputFormat, owner, provider, tableType,
+                createTime, lastAccessTime, fileSize, fileNum, isNative, 
allColumns, partitionColumns, isTransactional,
+                isRangePartition, s3Role, s3Endpoint);
+    }
+}
\ No newline at end of file
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
new file mode 100644
index 0000000000..033edb84f9
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.jnet.Installer;
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import scala.Option;
+import scala.collection.JavaConversions;
+
+public class NSparkTableMetaExplorer implements Serializable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkTableMetaExplorer.class);
+
+    enum PROVIDER {
+        HIVE("hive"), UNSPECIFIED("");
+
+        private static final PROVIDER[] ALL = new PROVIDER[] { HIVE };
+        private String value;
+
+        PROVIDER(String value) {
+            this.value = value;
+        }
+
+        public static PROVIDER fromString(Option<String> value) {
+            if (value.isEmpty()) {
+                return UNSPECIFIED;
+            }
+
+            for (PROVIDER provider : ALL) {
+                if (provider.value.equals(value.get())) {
+                    return provider;
+                }
+            }
+            return UNSPECIFIED;
+        }
+    }
+
+    private static final List<String> UNSUPOORT_TYPE = 
Lists.newArrayList("array", "map", "struct", "binary");
+    private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = 
"__CHAR_VARCHAR_TYPE_STRING";
+    public static final String S3_ROLE_PROPERTY_KEY = "role";
+    public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint";
+
+    public NSparkTableMeta getSparkTableMeta(String database, String 
tableName) {
+        SessionCatalog catalog = 
SparderEnv.getSparkSession().sessionState().catalog();
+        TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
+                Option.apply(database.isEmpty() ? null : database));
+        CatalogTable tableMetadata = 
catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+        checkTableIsValid(tableMetadata, tableIdentifier, tableName);
+        return getSparkTableMeta(tableName, tableMetadata);
+    }
+
+    public Set<String> checkAndGetTablePartitions(String database, String 
tableName, String partitionCol) {
+        SessionCatalog catalog = 
SparderEnv.getSparkSession().sessionState().catalog();
+        TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
+                Option.apply(database.isEmpty() ? null : database));
+
+        CatalogTable tableMetadata = 
catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+
+        String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ? 
null
+                : 
tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT);
+
+        if (!partitionCol.equalsIgnoreCase(firstPartCol)) {
+            throw new IllegalArgumentException(
+                    String.format(Locale.ROOT, "table partition col %s not 
match col %s", firstPartCol, partitionCol));
+        }
+        return 
JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier, 
Option.empty())).stream()
+                .map(item -> 
JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream()
+                        .filter(entry -> 
partitionCol.equalsIgnoreCase(entry.getKey())) //
+                        .findFirst() //
+                        .map(Map.Entry::getValue) //
+                        .orElse(null))
+                .filter(Objects::nonNull).collect(Collectors.toSet());
+    }
+
+    private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable 
tableMetadata) {
+        NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder();
+        builder.setTableName(tableName);
+        builder.setAllColumns(getColumns(tableMetadata, 
tableMetadata.schema()));
+        builder.setOwner(tableMetadata.owner());
+        builder.setCreateTime(tableMetadata.createTime() + "");
+        builder.setLastAccessTime(tableMetadata.lastAccessTime() + "");
+        builder.setTableType(tableMetadata.tableType().name());
+        builder.setPartitionColumns(getColumns(tableMetadata, 
tableMetadata.partitionSchema()));
+        builder.setIsRangePartition(isRangePartition(tableMetadata));
+        if (tableMetadata.storage().inputFormat().isDefined()) {
+            
builder.setSdInputFormat(tableMetadata.storage().inputFormat().get());
+        }
+        if (tableMetadata.storage().outputFormat().isDefined()) {
+            
builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get());
+        }
+        Option<URI> uriOption = tableMetadata.storage().locationUri();
+        if (uriOption.isDefined()) {
+            builder.setSdLocation(uriOption.get().toString());
+        }
+        if (tableMetadata.provider().isDefined()) {
+            builder.setProvider(tableMetadata.provider().get());
+        }
+        if (tableMetadata.properties().contains("totalSize")) {
+            
builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get()));
+        }
+        if (tableMetadata.properties().contains("numFiles")) {
+            
builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get()));
+        }
+        if (tableMetadata.properties().contains("transactional")) {
+            
builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get()));
+        }
+        if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) {
+            
builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get());
+        }
+
+        if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) {
+            
builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get());
+        }
+        return builder.createSparkTableMeta();
+    }
+
+    private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable 
tableMetadata, StructType schema) {
+        return getColumns(tableMetadata, schema, true);
+    }
+
+    private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable 
tableMetadata, StructType schema,
+            boolean isCheckRepeatColumn) {
+        List<NSparkTableMeta.SparkTableColumnMeta> allColumns = 
Lists.newArrayListWithCapacity(schema.size());
+        Set<String> columnCacheTemp = Sets.newHashSet();
+        for (org.apache.spark.sql.types.StructField field : schema.fields()) {
+            String type = field.dataType().simpleString();
+            if 
(field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) {
+                type = 
field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY);
+            }
+            String finalType = type;
+            if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) {
+                logger.info("Load table {} ignore column {}:{}", 
tableMetadata.identifier().identifier(), field.name(),
+                        finalType);
+                continue;
+            }
+            if (isCheckRepeatColumn && columnCacheTemp.contains(field.name())) 
{
+                logger.info("The【{}】column is already included and does not 
need to be added again", field.name());
+                continue;
+            }
+            columnCacheTemp.add(field.name());
+            allColumns.add(new 
NSparkTableMeta.SparkTableColumnMeta(field.name(), type,
+                    field.getComment().isDefined() ? field.getComment().get() 
: null));
+        }
+
+        return allColumns;
+    }
+
+    private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier 
tableIdentifier, String tableName) {
+        if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) {
+            try {
+                Installer.setURLStreamHandlerFactory(new 
FsUrlStreamHandlerFactory());
+                
SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed();
+            } catch (Throwable e) {
+                logger.error("Error for parser view: " + tableName, e);
+                throw new RuntimeException("Error for parser view: " + 
tableName + ", " + e.getMessage()
+                        + "(There are maybe syntactic differences between HIVE 
and SparkSQL)", e);
+            }
+        }
+    }
+
+    private Boolean isRangePartition(CatalogTable tableMetadata) {
+        List<NSparkTableMeta.SparkTableColumnMeta> allColumns = 
getColumns(tableMetadata, tableMetadata.schema(),
+                false);
+        return allColumns.stream().collect(Collectors.groupingBy(p -> 
p.name)).values().stream()
+                .anyMatch(p -> p.size() > 1);
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
new file mode 100644
index 0000000000..669f418c9c
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.kylin.source.IReadableTable.TableReader;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+
+public class NSparkTableReader implements TableReader {
+    private String dbName;
+    private String tableName;
+    private SparkSession ss;
+    private List<Row> records;
+    private Iterator<Row> iterator;
+    private Row currentRow;
+
+    public NSparkTableReader(String dbName, String tableName) {
+        this.dbName = dbName;
+        this.tableName = tableName;
+        initialize();
+    }
+
+    public static String[] getRowAsStringArray(Row record) {
+        StructField[] fields = record.schema().fields();
+        String[] arr = new String[fields.length];
+        for (int i = 0; i < arr.length; i++) {
+            Object o = record.get(i);
+            arr[i] = (o == null) ? null : o.toString();
+        }
+        return arr;
+    }
+
+    private void initialize() {
+        ss = SparderEnv.getSparkSession();
+        String master = ss.sparkContext().master();
+        String tableIdentity = tableName;
+        // spark sql can not add the database prefix when create tempView from 
csv, but when working with hive, it need the database prefix
+        if (!master.toLowerCase(Locale.ROOT).contains("local")) {
+            tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName, 
tableName);
+        }
+        records = SparkSqlUtil.queryAll(ss, tableIdentity);
+        iterator = records.iterator();
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        boolean hasNext = iterator != null && iterator.hasNext();
+        if (hasNext) {
+            currentRow = iterator.next();
+        }
+        return hasNext;
+    }
+
+    @Override
+    public String[] getRow() {
+        return getRowAsStringArray(currentRow);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.records = null;
+        this.iterator = null;
+        this.currentRow = null;
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
new file mode 100644
index 0000000000..cea86bc9fa
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+
+import com.google.common.collect.Sets;
+
+import lombok.val;
+
+public class SparkSqlUtil {
+    public static Dataset<Row> query(SparkSession ss, String sql) {
+        return ss.sql(sql);
+    }
+
+    public static List<Row> queryForList(SparkSession ss, String sql) {
+        return ss.sql(sql).collectAsList();
+    }
+
+    public static List<Row> queryAll(SparkSession ss, String table) {
+        String sql = String.format(Locale.ROOT, "select * from %s", table);
+        return queryForList(ss, sql);
+    }
+
+    public static Set<String> getViewOrignalTables(String viewName, 
SparkSession spark) throws AnalysisException {
+        String viewText = spark.sql("desc formatted " + 
viewName).where("col_name = 'View Text'").head().getString(1);
+        val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText);
+        Set<String> viewTables = Sets.newHashSet();
+        for (Object l : 
scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava())
 {
+            if (l instanceof UnresolvedRelation) {
+                val tableName = ((UnresolvedRelation) l).tableName();
+                //if nested view
+                if 
(spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name()))
 {
+                    viewTables.addAll(getViewOrignalTables(tableName, spark));
+                } else {
+                    viewTables.add(tableName);
+                }
+            }
+        }
+        return viewTables;
+    }
+}

Reply via email to