This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 555358303a KYLIN-5217 Create a initial commit
555358303a is described below
commit 555358303a1a685ce3b6ca970f7a9d5f5029eb50
Author: biaobiao.sun <[email protected]>
AuthorDate: Tue Aug 9 10:36:12 2022 +0800
KYLIN-5217 Create a initial commit
---
.../kap/engine/spark/job/EnviromentAdaptor.java | 29 +
.../kap/engine/spark/job/IJobProgressReport.java | 37 ++
.../kap/engine/spark/job/ISparkJobHandler.java | 47 ++
.../kap/engine/spark/job/ParamsConstants.java | 29 +
.../kap/engine/spark/job/SparkAppDescription.java | 53 ++
.../engine/spark/application/SparkApplication.java | 620 +++++++++++++++++++++
.../kylin/engine/spark/application/SparkEntry.java | 27 +
.../spark/source/NSparkCubingSourceInput.java | 77 +++
.../engine/spark/source/NSparkDataSource.java | 79 +++
.../spark/source/NSparkMetadataExplorer.java | 313 +++++++++++
.../kylin/engine/spark/source/NSparkTable.java | 65 +++
.../kylin/engine/spark/source/NSparkTableMeta.java | 111 ++++
.../spark/source/NSparkTableMetaBuilder.java | 140 +++++
.../spark/source/NSparkTableMetaExplorer.java | 203 +++++++
.../engine/spark/source/NSparkTableReader.java | 87 +++
.../kylin/engine/spark/source/SparkSqlUtil.java | 66 +++
16 files changed, 1983 insertions(+)
diff --git
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
new file mode 100644
index 0000000000..0301d20c91
--- /dev/null
+++
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+
+import org.apache.spark.sql.SparkSession;
+
+public interface EnviromentAdaptor {
+
+
+ Boolean prepareEnviroment(SparkSession spark, Map<String, String> params);
+}
diff --git
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
new file mode 100644
index 0000000000..9e5e58f20f
--- /dev/null
+++
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+
+public interface IJobProgressReport {
+
+ boolean updateSparkJobInfo(Map<String, String> params, String url, String
json);
+
+ boolean updateSparkJobExtraInfo(Map<String, String> params, String url,
String project, String jobId,
+ Map<String, String> extraInfo);
+
+ default boolean executeFinish(Map<String, String> params, String project,
String jobId) {
+ return true;
+ }
+
+ default void initArgsParams(Map<String, String> argsParams) {
+ }
+
+}
diff --git
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
new file mode 100644
index 0000000000..c167b52597
--- /dev/null
+++
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+
+public interface ISparkJobHandler {
+
+ void killOrphanApplicationIfExists(String project, String jobStepId,
KylinConfig config,
+ Map<String, String> sparkConf);
+
+ void checkApplicationJar(KylinConfig config) throws ExecuteException;
+
+ String createArgsFileOnRemoteFileSystem(KylinConfig config, String
project, String jobId,
+ Map<String, String> params) throws ExecuteException;
+
+ Object generateSparkCmd(KylinConfig config, SparkAppDescription desc);
+
+ default void modifyDump(Properties props) {
+ }
+
+ default void prepareEnviroment(String project, String jobStepId,
Map<String, String> params) {
+ }
+
+ Map<String, String> runSparkSubmit(Object cmd, String parentId) throws
ExecuteException;
+
+}
diff --git
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
new file mode 100644
index 0000000000..e218d319c7
--- /dev/null
+++
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+public class ParamsConstants {
+
+ private ParamsConstants() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static final String TIME_OUT = "time_out";
+ public static final String JOB_TMP_DIR = "job_tmp_dir";
+}
diff --git
a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
new file mode 100644
index 0000000000..8f105a5172
--- /dev/null
+++
b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.engine.spark.job;
+
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+
+@Data
+public class SparkAppDescription {
+
+ private String hadoopConfDir;
+
+ private String kylinJobJar;
+
+ private String appArgs;
+
+ private String jobNamePrefix;
+
+ private String project;
+
+ private String jobId;
+
+ private int stepId;
+
+ private String sparkSubmitClassName;
+
+ private Map<String, String> sparkConf;
+
+ private Set<String> sparkJars;
+
+ private Set<String> sparkFiles;
+
+ private String comma;
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
new file mode 100644
index 0000000000..cac224d44b
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.application;
+
+import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE;
+import static
org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.cluster.IClusterManager;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.util.Application;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.TimeZoneUtils;
+import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.engine.spark.job.BuildJobInfos;
+import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
+import org.apache.kylin.engine.spark.job.ResourceDetect;
+import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.engine.spark.job.SegmentBuildJob;
+import org.apache.kylin.engine.spark.job.SparkJobConstants;
+import org.apache.kylin.engine.spark.job.UdfManager;
+import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
+import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
+import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.query.pushdown.SparkSubmitter;
+import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.spark.SparkConf;
+import org.apache.spark.application.NoRetryException;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.sql.KylinSession;
+import org.apache.spark.sql.KylinSession$;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SparkSessionExtensions;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.rules.Rule;
+import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
+import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import io.kyligence.kap.engine.spark.job.EnviromentAdaptor;
+import io.kyligence.kap.engine.spark.job.IJobProgressReport;
+import io.kyligence.kap.engine.spark.job.ParamsConstants;
+import lombok.val;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+public abstract class SparkApplication implements Application {
+ private static final Logger logger =
LoggerFactory.getLogger(SparkApplication.class);
+ private Map<String, String> params = Maps.newHashMap();
+ public static final String JOB_NAME_PREFIX = "job_step_";
+ private IJobProgressReport report;
+
+ protected volatile KylinConfig config;
+ protected volatile String jobId;
+ protected String project;
+ protected int layoutSize = -1;
+ protected BuildJobInfos infos;
+ /**
+ * path for spark app args on HDFS
+ */
+ protected String path;
+
+ private ClusterMonitor clusterMonitor;
+ private final AtomicLong atomicDisconnectSparkMasterTimes = new
AtomicLong(0);
+ private final AtomicBoolean atomicUnreachableSparkMaster = new
AtomicBoolean(false);
+ private final AtomicReference<SparkConf> atomicSparkConf = new
AtomicReference<>(null);
+ private final AtomicReference<SparkSession> atomicSparkSession = new
AtomicReference<>(null);
+ private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new
AtomicReference<>(null);
+
+ public void execute(String[] args) {
+ try {
+ path = args[0];
+ String argsLine = readArgsFromHDFS();
+ params = JsonUtil.readValueAsMap(argsLine);
+ logger.info("Execute {} with args : {}",
this.getClass().getName(), argsLine);
+ execute();
+ } catch (Exception e) {
+ throw new RuntimeException("Error execute " +
this.getClass().getName(), e);
+ }
+ }
+
+ public AtomicBoolean getAtomicUnreachableSparkMaster() {
+ return atomicUnreachableSparkMaster;
+ }
+
+ public final Map<String, String> getParams() {
+ return this.params;
+ }
+
+ public final String getParam(String key) {
+ return this.params.get(key);
+ }
+
+ public final void setParam(String key, String value) {
+ this.params.put(key, value);
+ }
+
+ public final boolean contains(String key) {
+ return params.containsKey(key);
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ public IJobProgressReport getReport() {
+ if (report == null)
+ return new RestfulJobProgressReport();
+ return report;
+ }
+
+ /// backwards compatibility, must have been initialized before invoking
#doExecute.
+ protected SparkSession ss;
+
+ public SparkSession getSparkSession() throws NoRetryException {
+ SparkSession sparkSession = atomicSparkSession.get();
+ if (Objects.isNull(sparkSession)) {
+ // shouldn't reach here
+ throw new NoRetryException("spark session shouldn't be null");
+ }
+ return sparkSession;
+ }
+
+ public String readArgsFromHDFS() {
+ val fs = HadoopUtil.getFileSystem(path);
+ String argsLine = null;
+ Path filePath = new Path(path);
+ try (FSDataInputStream inputStream = fs.open(filePath)) {
+ BufferedReader br = new BufferedReader(new
InputStreamReader(inputStream, Charset.defaultCharset()));
+ argsLine = br.readLine();
+ } catch (IOException e) {
+ logger.error("Error occurred when reading args file: {}", path, e);
+ }
+ return argsLine;
+ }
+
+ /**
+ * get tracking url by application id
+ *
+ * @param sparkSession build sparkSession
+ * @return
+ */
+ public String getTrackingUrl(IClusterManager clusterManager, SparkSession
sparkSession) {
+ return clusterManager.getBuildTrackingUrl(sparkSession);
+ }
+
+ private String tryReplaceHostAddress(String url) {
+ String originHost = null;
+ try {
+ URI uri = URI.create(url);
+ originHost = uri.getHost();
+ String hostAddress =
InetAddress.getByName(originHost).getHostAddress();
+ return url.replace(originHost, hostAddress);
+ } catch (UnknownHostException uhe) {
+ logger.error("failed to get the ip address of {}, step back to use
the origin tracking url.", originHost,
+ uhe);
+ return url;
+ }
+ }
+
+ private Map<String, String> getTrackingInfo(SparkSession sparkSession,
boolean ipAddressPreferred) {
+ IClusterManager clusterManager = atomicBuildEnv.get().clusterManager();
+ String applicationId = sparkSession.sparkContext().applicationId();
+ Map<String, String> extraInfo = new HashMap<>();
+ extraInfo.put("yarn_app_id", applicationId);
+ try {
+ String trackingUrl = getTrackingUrl(clusterManager, sparkSession);
+ if (StringUtils.isBlank(trackingUrl)) {
+ logger.warn("Get tracking url of application {}, but empty url
found.", applicationId);
+ return extraInfo;
+ }
+ if (ipAddressPreferred) {
+ trackingUrl = tryReplaceHostAddress(trackingUrl);
+ }
+ extraInfo.put("yarn_app_url", trackingUrl);
+ } catch (Exception e) {
+ logger.error("get tracking url failed!", e);
+ }
+ return extraInfo;
+ }
+
+ protected void exchangeSparkSession() {
+ exchangeSparkSession(atomicSparkConf.get());
+ }
+
+ protected final void execute() throws Exception {
+ String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
+ jobId = getParam(NBatchConstants.P_JOB_ID);
+ project = getParam(NBatchConstants.P_PROJECT_NAME);
+ if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
+ layoutSize =
StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
+ }
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig =
KylinConfig
+
.setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl)))
{
+ config = autoCloseConfig.get();
+ report = (IJobProgressReport)
ClassUtil.newInstance(config.getBuildJobProgressReporter());
+ report.initArgsParams(getParams());
+ //// KylinBuildEnv
+ final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config);
+ atomicBuildEnv.set(buildEnv);
+ infos = buildEnv.buildJobInfos();
+ infos.recordJobId(jobId);
+ infos.recordProject(project);
+
infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId));
+
+ monitorSparkMaster();
+
+ HadoopUtil.setCurrentConfiguration(new Configuration());
+ ////////
+ exchangeSparkConf(buildEnv.sparkConf());
+
+ TimeZoneUtils.setDefaultTimeZone(config);
+
+ /// wait until resource is enough
+ waiteForResource(atomicSparkConf.get(), buildEnv);
+
+ ///
+ logger.info("Prepare job environment");
+ prepareSparkSession();
+
+ /// backwards compatibility
+ ss = getSparkSession();
+ val master = ss.conf().get(SparkLauncher.SPARK_MASTER, "");
+ if (!master.equals("local")) {
+ EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil
+ .newInstance(config.getBuildJobEnviromentAdaptor());
+ adaptor.prepareEnviroment(ss, params);
+ }
+
+ if (config.useDynamicS3RoleCredentialInTable()) {
+ val tableMetadataManager =
NTableMetadataManager.getInstance(config, project);
+ tableMetadataManager.listAllTables().forEach(tableDesc ->
SparderEnv
+
.addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc),
ss));
+ }
+
+ if (!config.isUTEnv()) {
+ Unsafe.setProperty("kylin.env", config.getDeployEnv());
+ }
+ logger.info("Start job");
+ infos.startJob();
+ // should be invoked after method prepareSparkSession
+ extraInit();
+
+ waiteForResourceSuccess();
+ doExecute();
+ // Output metadata to another folder
+ val resourceStore = ResourceStore.getKylinMetaStore(config);
+ val outputConfig = KylinConfig.createKylinConfig(config);
+
outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL));
+
MetadataStore.createMetadataStore(outputConfig).dump(resourceStore);
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ if (infos != null) {
+ infos.jobEnd();
+ }
+ destroySparkSession();
+ extraDestroy();
+ executeFinish();
+ }
+ }
+
+ protected void handleException(Exception e) throws Exception {
+ throw e;
+ }
+
+ private SparkSession createSpark(SparkConf sparkConf) {
+ SparkSession.Builder sessionBuilder = SparkSession.builder()
+ .withExtensions(new AbstractFunction1<SparkSessionExtensions,
BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(SparkSessionExtensions v1) {
+ v1.injectPostHocResolutionRule(new
AbstractFunction1<SparkSession, Rule<LogicalPlan>>() {
+ @Override
+ public Rule<LogicalPlan> apply(SparkSession
session) {
+ return new AlignmentTableStats(session);
+ }
+ });
+ return BoxedUnit.UNIT;
+ }
+ }).enableHiveSupport().config(sparkConf)
+ .config("mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false");
+
+ // If this is UT and SparkSession is already created, then use
SparkSession.
+ // Otherwise, we always use KylinSession
+ boolean createWithSparkSession = !isJobOnCluster(sparkConf) &&
SparderEnv.isSparkAvailable();
+ if (createWithSparkSession) {
+ boolean isKylinSession = SparderEnv.getSparkSession() instanceof
KylinSession;
+ createWithSparkSession = !isKylinSession;
+ }
+
+ if (createWithSparkSession) {
+ return sessionBuilder.getOrCreate();
+ } else {
+ return
KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession();
+ }
+ }
+
+ public boolean isJobOnCluster(SparkConf conf) {
+ return !Utils.isLocalMaster(conf) && !config.isUTEnv();
+ }
+
+ protected void extraInit() {
+ }
+
+ public void extraDestroy() {
+ if (clusterMonitor != null) {
+ clusterMonitor.shutdown();
+ }
+ }
+
+ protected abstract void doExecute() throws Exception;
+
+ protected void onLayoutFinished(long layoutId) {
+ //do nothing
+ }
+
+ protected void onExecuteFinished() {
+ //do nothing
+ }
+
+ protected String calculateRequiredCores() throws Exception {
+ return SparkJobConstants.DEFAULT_REQUIRED_CORES;
+ }
+
+ private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
+ SparkConfHelper helper = new SparkConfHelper();
+ // copy user defined spark conf
+ if (sparkConf.getAll() != null) {
+ Arrays.stream(sparkConf.getAll()).forEach(config ->
helper.setConf(config._1, config._2));
+ }
+ helper.setClusterManager(KylinBuildEnv.get().clusterManager());
+
+ chooseContentSize(helper);
+
+ helper.setOption(SparkConfHelper.LAYOUT_SIZE,
Integer.toString(layoutSize));
+ helper.setOption(SparkConfHelper.REQUIRED_CORES,
calculateRequiredCores());
+ helper.setConf(COUNT_DISTICT, hasCountDistinct().toString());
+ helper.generateSparkConf();
+ helper.applySparkConf(sparkConf);
+ }
+
+ private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv)
throws Exception {
+ val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
+ infos.recordStageId(waiteForResource.getId());
+ waiteForResource.execute();
+ }
+
+ protected void waiteForResourceSuccess() throws Exception {
+ val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
+ waiteForResource.onStageFinished(true);
+ infos.recordStageId("");
+ }
+
+ protected void executeFinish() {
+ try {
+ getReport().executeFinish(getReportParams(), project, getJobId());
+ } catch (Exception e) {
+ logger.error("executeFinish failed", e);
+ }
+ }
+
+ protected void chooseContentSize(SparkConfHelper helper) {
+ Path shareDir = config.getJobTmpShareDir(project, jobId);
+ // add content size with unit
+ helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE,
chooseContentSize(shareDir));
+ }
+
+ protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) {
+ return modelDesc.getAllTableRefs().stream().anyMatch(p ->
p.getTableDesc().isRangePartition());
+ }
+
+ protected String chooseContentSize(Path shareDir) {
+ // return size with unit
+ return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b";
+ }
+
+ protected Boolean hasCountDistinct() throws IOException {
+ Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId),
+ ResourceDetectUtils.countDistinctSuffix());
+ FileSystem fileSystem =
countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration());
+ Boolean exist;
+ if (fileSystem.exists(countDistinct)) {
+ exist = ResourceDetectUtils.readResourcePathsAs(countDistinct);
+ } else {
+ exist = false;
+ logger.debug("File count_distinct.json doesn't exist, set
hasCountDistinct to false.");
+ }
+ logger.debug("Exist count distinct measure: {}", exist);
+ return exist;
+ }
+
+ public void logJobInfo() {
+ try {
+ logger.info(generateInfo());
+ if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime())
{
+ logger.info("skip record job wait and run time");
+ return;
+ }
+ Map<String, String> extraInfo = new HashMap<>();
+ extraInfo.put("yarn_job_wait_time", ((Long)
KylinBuildEnv.get().buildJobInfos().waitTime()).toString());
+ extraInfo.put("yarn_job_run_time", ((Long)
KylinBuildEnv.get().buildJobInfos().buildTime()).toString());
+
+ getReport().updateSparkJobExtraInfo(getReportParams(),
"/kylin/api/jobs/wait_and_run_time", project, jobId,
+ extraInfo);
+ } catch (Exception e) {
+ logger.warn("Error occurred when generate job info.", e);
+ }
+ }
+
+ private Map<String, String> getReportParams() {
+ val reportParams = new HashMap<String, String>();
+ reportParams.put(ParamsConstants.TIME_OUT,
String.valueOf(config.getUpdateJobInfoTimeout()));
+ reportParams.put(ParamsConstants.JOB_TMP_DIR,
config.getJobTmpDir(project, true));
+ return reportParams;
+ }
+
+ protected String generateInfo() {
+ return LogJobInfoUtils.sparkApplicationInfo();
+ }
+
+ public Set<String> getIgnoredSnapshotTables() {
+ return
NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES));
+ }
+
+ protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
+ return config.getSparkConfigOverride();
+ }
+
+ protected void checkDateFormatIfExist(String project, String modelId)
throws Exception {
+ if (config.isUTEnv()) {
+ return;
+ }
+ val modelManager = NDataModelManager.getInstance(config, project);
+ NDataModel modelDesc = modelManager.getDataModelDesc(modelId);
+ if (checkRangePartitionTableIsExist(modelDesc)) {
+ logger.info("Range partitioned tables do not support pushdown, so
do not need to perform subsequent logic");
+ return;
+ }
+
+ val partitionDesc = modelDesc.getPartitionDesc();
+ if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)
+ ||
org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat()))
+ return;
+
+ if
(CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType()))
+ return;
+
+ String partitionColumn =
modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB();
+
+ SparkSession sparkSession = atomicSparkSession.get();
+ try (SparkSubmitter.OverriddenSparkSession ignored =
SparkSubmitter.getInstance()
+ .overrideSparkSession(sparkSession)) {
+ String dateString =
PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(),
partitionColumn,
+ project);
+ val sdf = new
SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(),
+ Locale.getDefault(Locale.Category.FORMAT));
+ val date = sdf.parse(dateString);
+ if (date == null || !dateString.equals(sdf.format(date))) {
+ throw new NoRetryException("date format not match");
+ }
+ } catch (KylinException ignore) {
+ // ignore it when pushdown return empty row
+ } catch (ParseException | NoRetryException e) {
+ throw new NoRetryException("date format not match");
+ }
+ }
+
+ private void exchangeSparkConf(SparkConf sparkConf) throws Exception {
+ if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
+ Map<String, String> baseSparkConf = getSparkConfigOverride(config);
+ if (!baseSparkConf.isEmpty()) {
+ baseSparkConf.forEach(sparkConf::set);
+ String baseSparkConfStr =
JsonUtil.writeValueAsString(baseSparkConf);
+ logger.info("Override user-defined spark conf: {}",
baseSparkConfStr);
+ }
+ if (config.isAutoSetSparkConf()) {
+ logger.info("Set spark conf automatically.");
+ try {
+ autoSetSparkConf(sparkConf);
+ } catch (Exception e) {
+ logger.warn("Auto set spark conf failed. Load spark conf
from system properties", e);
+ }
+ }
+ }
+
+ atomicSparkConf.set(sparkConf);
+ }
+
+ private void exchangeSparkSession(SparkConf sparkConf) {
+ SparkSession sparkSession = atomicSparkSession.get();
+ if (Objects.nonNull(sparkSession)) {
+ // destroy previous spark session
+ destroySparkSession();
+ }
+
+ sparkSession = createSpark(sparkConf);
+ if (!config.isUTEnv() &&
!sparkConf.get("spark.master").startsWith("k8s")) {
+ getReport().updateSparkJobExtraInfo(getReportParams(),
"/kylin/api/jobs/spark", project, jobId,
+ getTrackingInfo(sparkSession,
config.isTrackingUrlIpAddressEnabled()));
+ }
+
+ // for spark metrics
+ JobMetricsUtils.registerListener(sparkSession);
+ SparderEnv.registerListener(sparkSession.sparkContext());
+
+ //#8341
+ SparderEnv.setSparkSession(sparkSession);
+ UdfManager.create(sparkSession);
+
+ ///
+ atomicSparkSession.set(sparkSession);
+ }
+
+ private void prepareSparkSession() throws NoRetryException {
+ SparkConf sparkConf = atomicSparkConf.get();
+ if (Objects.isNull(sparkConf)) {
+ // shouldn't reach here
+ throw new NoRetryException("spark conf shouldn't be null");
+ }
+
+ /// SegmentBuildJob only!!!
+ if (config.isSnapshotSpecifiedSparkConf() && (this instanceof
SegmentBuildJob)) {
+ // snapshot specified spark conf, based on the exchanged spark
conf.
+ SparkConf clonedSparkConf = sparkConf.clone();
+ Map<String, String> snapshotSparkConf =
config.getSnapshotBuildingConfigOverride();
+ snapshotSparkConf.forEach(clonedSparkConf::set);
+ logger.info("exchange sparkSession using snapshot specified
sparkConf");
+ exchangeSparkSession(clonedSparkConf);
+ return;
+ }
+ // normal logic
+ exchangeSparkSession(sparkConf);
+ }
+
+ private void destroySparkSession() {
+ SparkSession sparkSession = atomicSparkSession.get();
+ if (Objects.isNull(sparkSession)) {
+ logger.info("no initialized sparkSession instance");
+ return;
+ }
+ if (sparkSession.conf().get("spark.master").startsWith("local")) {
+ // for UT use? but very strange for resource detect mode (spark
local).
+ return;
+ }
+ JobMetricsUtils.unRegisterListener(sparkSession);
+ sparkSession.stop();
+ }
+
+ private void monitorSparkMaster() {
+ clusterMonitor = new ClusterMonitor();
+ clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession,
atomicDisconnectSparkMasterTimes,
+ atomicUnreachableSparkMaster);
+ }
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
new file mode 100644
index 0000000000..31974f65bc
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.application;
+
+import org.apache.spark.application.JobWorkSpace;
+
+public class SparkEntry {
+ public static void main(String[] args) {
+ JobWorkSpace.execute(args);
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
new file mode 100644
index 0000000000..5411c31bbd
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import static
org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.SparderTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class NSparkCubingSourceInput implements
NSparkCubingEngine.NSparkCubingSource {
+ private static final Logger logger =
LoggerFactory.getLogger(NSparkCubingSourceInput.class);
+
+ @Override
+ public Dataset<Row> getSourceData(TableDesc table, SparkSession ss,
Map<String, String> params) {
+ ColumnDesc[] columnDescs = table.getColumns();
+ List<String> tblColNames =
Lists.newArrayListWithCapacity(columnDescs.length);
+ StructType kylinSchema = new StructType();
+ for (ColumnDesc columnDesc : columnDescs) {
+ if (!columnDesc.isComputedColumn()) {
+ kylinSchema = kylinSchema.add(columnDesc.getName(),
+ SparderTypeUtil.toSparkType(columnDesc.getType(),
false), true);
+ tblColNames.add("`" + columnDesc.getName() + "`");
+ }
+ }
+ String[] colNames = tblColNames.toArray(new String[0]);
+ String colString = Joiner.on(",").join(colNames);
+ String sql;
+ KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig();
+
logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}",
+ table.isRangePartition(), table.isTransactional(),
kylinConfig.isReadTransactionalTableEnabled());
+ if (isNeedCreateHiveTemporaryTable(table.isRangePartition(),
table.isTransactional(),
+ kylinConfig.isReadTransactionalTableEnabled())) {
+ sql =
HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params,
colString,
+ KylinBuildEnv.get());
+ } else {
+ sql = String.format(Locale.ROOT, "select %s from %s", colString,
table.getIdentity());
+ }
+ Dataset<Row> df = ss.sql(sql);
+ StructType sparkSchema = df.schema();
+ logger.debug("Source data sql is: {}", sql);
+ logger.debug("Kylin schema: {}", kylinSchema.treeString());
+ return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema,
kylinSchema));
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
new file mode 100644
index 0000000000..6a501bc71e
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+
+public class NSparkDataSource implements ISource {
+ // for reflection
+ public NSparkDataSource(KylinConfig config) {
+
+ }
+
+ @Override
+ public ISourceMetadataExplorer getSourceMetadataExplorer() {
+ return new NSparkMetadataExplorer();
+ }
+
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) {
+ return (I) new NSparkCubingSourceInput();
+ } else {
+ throw new IllegalArgumentException("Unsupported engine interface:
" + engineInterface);
+ }
+ }
+
+ @Override
+ public IReadableTable createReadableTable(TableDesc tableDesc) {
+ return new NSparkTable(tableDesc);
+ }
+
+ @Override
+ public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable,
SegmentRange srcPartition) {
+ return srcPartition;
+ }
+
+ @Override
+ public ISampleDataDeployer getSampleDataDeployer() {
+ return new NSparkMetadataExplorer();
+ }
+
+ @Override
+ public SegmentRange getSegmentRange(String start, String end) {
+ start = StringUtils.isEmpty(start) ? "0" : start;
+ end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end;
+ return new
SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start),
Long.parseLong(end));
+ }
+
+ @Override
+ public boolean supportBuildSnapShotByPartition() {
+ return true;
+ }
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
new file mode 100644
index 0000000000..7e4f7deea0
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Database;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.internal.SQLConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Sets;
+
+import lombok.val;
+
+public class NSparkMetadataExplorer implements ISourceMetadataExplorer,
ISampleDataDeployer, Serializable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NSparkMetadataExplorer.class);
+
+ public static String generateCreateSchemaSql(String schemaName) {
+ return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s",
schemaName);
+ }
+
+ public static String[] generateCreateTableSql(TableDesc tableDesc) {
+ String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
+
+ StringBuilder ddl = new StringBuilder();
+ ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
+ ddl.append("(" + "\n");
+
+ for (int i = 0; i < tableDesc.getColumns().length; i++) {
+ ColumnDesc col = tableDesc.getColumns()[i];
+ if (i > 0) {
+ ddl.append(",");
+ }
+ ddl.append(col.getName() + " " + col.getDatatype() + "\n");
+ }
+
+ ddl.append(")" + "\n");
+ ddl.append("USING com.databricks.spark.csv");
+
+ return new String[] { dropSql, ddl.toString() };
+ }
+
+ public NSparkTableMetaExplorer getTableMetaExplorer() {
+ return new NSparkTableMetaExplorer();
+ }
+
+ @Override
+ public List<String> listDatabases() throws Exception {
+ Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show
databases").select("namespace");
+ return dataset.collectAsList().stream().map(row ->
row.getString(0)).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<String> listTables(String database) throws Exception {
+ val ugi = UserGroupInformation.getCurrentUser();
+ val config = KylinConfig.getInstanceFromEnv();
+ val spark = SparderEnv.getSparkSession();
+
+ List<String> tables = Lists.newArrayList();
+ try {
+ String sql = "show tables";
+ if (StringUtils.isNotBlank(database)) {
+ sql = String.format(Locale.ROOT, sql + " in %s", database);
+ }
+ Dataset<Row> dataset =
SparderEnv.getSparkSession().sql(sql).select("tableName");
+ tables = dataset.collectAsList().stream().map(row ->
row.getString(0)).collect(Collectors.toList());
+
+ if (config.getTableAccessFilterEnable() &&
config.getKerberosProjectLevelEnable()
+ && UserGroupInformation.isSecurityEnabled()) {
+ List<String> accessTables = Lists.newArrayList();
+ for (String table : tables) {
+ val tableName = database + "." + table;
+ if (checkTableAccess(tableName)) {
+ accessTables.add(table);
+ }
+ }
+ return accessTables;
+ }
+ } catch (Exception e) {
+ logger.error("List hive tables failed. user: {}, db: {}",
ugi.getUserName(), database);
+ }
+
+ return tables;
+ }
+
+ public boolean checkTableAccess(String tableName) {
+ boolean isAccess = true;
+ try {
+ val spark = SparderEnv.getSparkSession();
+ val sparkTable = spark.catalog().getTable(tableName);
+ Set<String> needCheckTables = Sets.newHashSet();
+ if (sparkTable.tableType().equals(CatalogTableType.VIEW().name()))
{
+ needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName,
SparderEnv.getSparkSession());
+ } else {
+ needCheckTables.add(tableName);
+ }
+ String hiveSpecFsLocation =
spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
+ FileSystem fs = null == hiveSpecFsLocation ?
HadoopUtil.getWorkingFileSystem()
+ : HadoopUtil.getFileSystem(hiveSpecFsLocation);
+ for (String table : needCheckTables) {
+ fs.listStatus(new Path(getLoc(spark, table,
hiveSpecFsLocation)));
+ }
+ } catch (Exception e) {
+ isAccess = false;
+ try {
+ logger.error("Read hive table {} error:{}, ugi name: {}.",
tableName, e.getMessage(),
+ UserGroupInformation.getCurrentUser().getUserName());
+ } catch (IOException ex) {
+ logger.error("fetch user curr ugi info error.", e);
+ }
+ }
+ return isAccess;
+ }
+
+ @Override
+ public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String
database, String tableName, String prj)
+ throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager metaMgr =
NTableMetadataManager.getInstance(config, prj);
+
+ NSparkTableMeta tableMeta =
getTableMetaExplorer().getSparkTableMeta(database, tableName);
+ TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+
+ // make a new TableDesc instance, don't modify the one in use
+ if (tableDesc == null) {
+ tableDesc = new TableDesc();
+ tableDesc.setDatabase(database.toUpperCase(Locale.ROOT));
+ tableDesc.setName(tableName.toUpperCase(Locale.ROOT));
+ tableDesc.setUuid(RandomUtil.randomUUIDStr());
+ tableDesc.setLastModified(0);
+ } else {
+ tableDesc = new TableDesc(tableDesc);
+ }
+
+ if (tableMeta.tableType != null) {
+ tableDesc.setTableType(tableMeta.tableType);
+ }
+ //set table type = spark
+ tableDesc.setSourceType(ISourceAware.ID_SPARK);
+ tableDesc.setTransactional(tableMeta.isTransactional);
+ tableDesc.setRangePartition(tableMeta.isRangePartition);
+
+ Set<String> partColumnSet =
Optional.ofNullable(tableMeta.partitionColumns) //
+ .orElseGet(Collections::emptyList).stream().map(field ->
field.name) //
+ .collect(Collectors.toSet());
+ int columnNumber = tableMeta.allColumns.size();
+ List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+ for (int i = 0; i < columnNumber; i++) {
+ NSparkTableMeta.SparkTableColumnMeta field =
tableMeta.allColumns.get(i);
+ ColumnDesc cdesc = new ColumnDesc();
+ cdesc.setName(field.name.toUpperCase(Locale.ROOT));
+ cdesc.setCaseSensitiveName(field.name);
+ // use "double" in kylin for "float"
+ if ("float".equalsIgnoreCase(field.dataType)) {
+ cdesc.setDatatype("double");
+ } else {
+ cdesc.setDatatype(field.dataType);
+ }
+ cdesc.setId(String.valueOf(i + 1));
+ cdesc.setComment(field.comment);
+ cdesc.setPartitioned(partColumnSet.contains(field.name));
+ columns.add(cdesc);
+ }
+ tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+ List<String> partCols = tableMeta.partitionColumns.stream().map(col ->
col.name).collect(Collectors.toList());
+ if (!partCols.isEmpty()) {
+
tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT));
+ } else {
+ tableDesc.setPartitionColumn(null);
+ }
+ StringBuilder partitionColumnBuilder = new StringBuilder();
+ for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) {
+ if (i > 0)
+ partitionColumnBuilder.append(", ");
+
partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT));
+ }
+
+ TableExtDesc tableExtDesc = new TableExtDesc();
+ tableExtDesc.setIdentity(tableDesc.getIdentity());
+ tableExtDesc.setUuid(RandomUtil.randomUUIDStr());
+ tableExtDesc.setLastModified(0);
+ tableExtDesc.init(prj);
+
+ tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY,
tableMeta.sdLocation);
+ tableExtDesc.addDataSourceProp("owner", tableMeta.owner);
+ tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime);
+ tableExtDesc.addDataSourceProp("last_access_time",
tableMeta.lastAccessTime);
+ tableExtDesc.addDataSourceProp("partition_column",
partitionColumnBuilder.toString());
+ tableExtDesc.addDataSourceProp("total_file_size",
String.valueOf(tableMeta.fileSize));
+ tableExtDesc.addDataSourceProp("total_file_number",
String.valueOf(tableMeta.fileNum));
+ tableExtDesc.addDataSourceProp("hive_inputFormat",
tableMeta.sdInputFormat);
+ tableExtDesc.addDataSourceProp("hive_outputFormat",
tableMeta.sdOutputFormat);
+ tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY,
tableMeta.s3Role);
+ tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY,
tableMeta.s3Endpoint);
+ return Pair.newPair(tableDesc, tableExtDesc);
+ }
+
+ @Override
+ public List<String> getRelatedKylinResources(TableDesc table) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean checkDatabaseAccess(String database) throws Exception {
+ boolean hiveDBAccessFilterEnable =
KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
+ if (hiveDBAccessFilterEnable) {
+ logger.info("Check database {} access start.", database);
+ try {
+ Database db =
SparderEnv.getSparkSession().catalog().getDatabase(database);
+ } catch (AnalysisException e) {
+ UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
+ logger.info("The current user: {} does not have permission to
access database {}", ugi.getUserName(),
+ database);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean checkTablesAccess(Set<String> tables) {
+ return tables.stream().allMatch(this::checkTableAccess);
+ }
+
+ @Override
+ public Set<String> getTablePartitions(String database, String table,
String prj, String partCol) {
+ return getTableMetaExplorer().checkAndGetTablePartitions(database,
table, partCol);
+ }
+
+ @Override
+ public void createSampleDatabase(String database) throws Exception {
+ SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database));
+ }
+
+ @Override
+ public void createSampleTable(TableDesc table) throws Exception {
+ String[] createTableSqls = generateCreateTableSql(table);
+ for (String sql : createTableSqls) {
+ SparderEnv.getSparkSession().sql(sql);
+ }
+ }
+
+ @Override
+ public void loadSampleData(String tableName, String tableFileDir) throws
Exception {
+ Dataset<Row> dataset =
SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName +
".csv").toDF();
+ if (tableName.indexOf(".") > 0) {
+ tableName = tableName.substring(tableName.indexOf(".") + 1);
+ }
+ dataset.createOrReplaceTempView(tableName);
+ }
+
+ @Override
+ public void createWrapperView(String origTableName, String viewName)
throws Exception {
+ throw new UnsupportedOperationException("unsupport create wrapper
view");
+ }
+
+ public String getLoc(SparkSession spark, String table, String
hiveSpecFsLocation) {
+ String loc = spark.sql("desc formatted " + table).where("col_name ==
'Location'").head().getString(1);
+ if (null == hiveSpecFsLocation || null == loc) {
+ return loc;
+ }
+ return loc.replace("hdfs://hacluster", hiveSpecFsLocation);
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
new file mode 100644
index 0000000000..6ab6643154
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NSparkTable implements IReadableTable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NSparkTable.class);
+
+ final private String database;
+ final private String tableName;
+
+ public NSparkTable(TableDesc tableDesc) {
+ this.database = tableDesc.getDatabase();
+ this.tableName = tableDesc.getName();
+ }
+
+ @Override
+ public TableReader getReader() throws IOException {
+ return new NSparkTableReader(database, tableName);
+ }
+
+ @Override
+ public TableSignature getSignature() throws IOException {
+ // TODO: 07/12/2017 get modify time
+ String path = String.format(Locale.ROOT, "%s.%s", database, tableName);
+ long lastModified = System.currentTimeMillis(); // assume table is
ever changing
+ int size = 0;
+ return new TableSignature(path, size, lastModified);
+ }
+
+ @Override
+ public boolean exists() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "spark: database=[" + database + "], table=[" + tableName + "]";
+ }
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
new file mode 100644
index 0000000000..29f48bfbfc
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+
+public class NSparkTableMeta {
+ public static class SparkTableColumnMeta {
+ String name;
+ String dataType;
+ String comment;
+
+ public SparkTableColumnMeta(String name, String dataType, String
comment) {
+ this.name = name;
+ this.dataType = dataType;
+ this.comment = comment;
+ }
+
+ @Override
+ public String toString() {
+ return "SparkTableColumnMeta{" + "name='" + name + '\'' + ",
dataType='" + dataType + '\'' + ", comment='"
+ + comment + '\'' + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+ }
+
+ String tableName;
+ String sdLocation;//sd is short for storage descriptor
+ String sdInputFormat;
+ String sdOutputFormat;
+ String owner;
+ String provider;
+ String tableType;
+ String createTime;
+ String lastAccessTime;
+ long fileSize;
+ long fileNum;
+ boolean isNative;
+ List<SparkTableColumnMeta> allColumns;
+ List<SparkTableColumnMeta> partitionColumns;
+ boolean isTransactional;
+ boolean isRangePartition;
+ String s3Role;
+ String s3Endpoint;
+
+ public List<SparkTableColumnMeta> getAllColumns() {
+ return allColumns;
+ }
+
+ public NSparkTableMeta(String tableName, String sdLocation, String
sdInputFormat, String sdOutputFormat,
+ String owner, String provider, String tableType, String
createTime, String lastAccessTime, long fileSize,
+ long fileNum, boolean isNative, List<SparkTableColumnMeta>
allColumns,
+ List<SparkTableColumnMeta> partitionColumns, boolean
isTransactional, boolean isRangePartition,
+ String s3Role, String s3Endpoint) {
+ this.tableName = tableName;
+ this.sdLocation = sdLocation;
+ this.sdInputFormat = sdInputFormat;
+ this.sdOutputFormat = sdOutputFormat;
+ this.owner = owner;
+ this.provider = provider;
+ this.tableType = tableType;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.fileSize = fileSize;
+ this.fileNum = fileNum;
+ this.isNative = isNative;
+ this.allColumns = allColumns;
+ this.partitionColumns = partitionColumns;
+ this.isTransactional = isTransactional;
+ this.isRangePartition = isRangePartition;
+ this.s3Role = s3Role;
+ this.s3Endpoint = s3Endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ",
sdLocation='" + sdLocation + '\''
+ + ", sdInputFormat='" + sdInputFormat + '\'' + ",
sdOutputFormat='" + sdOutputFormat + '\''
+ + ", owner='" + owner + ", provider='" + provider + '\'' + ",
tableType='" + tableType
+ + ", createTime='" + createTime + '\'' + ", lastAccessTime=" +
lastAccessTime + ", fileSize=" + fileSize
+ + ", fileNum=" + fileNum + ", isNative=" + isNative + ",
allColumns=" + allColumns
+ + ", partitionColumns=" + partitionColumns + ",
isTransactional=" + isTransactional
+ + ", isRangePartition=" + isRangePartition + ", s3Role=" +
s3Role + ", s3Endpoint=" + s3Endpoint + '}';
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
new file mode 100644
index 0000000000..2d4a9eeae1
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+public class NSparkTableMetaBuilder {
+ private String tableName;
+ private String sdLocation;
+ private String sdInputFormat;
+ private String sdOutputFormat;
+ private String owner;
+ private String provider;
+ private String tableType;
+ private String createTime;
+ private String lastAccessTime;
+ private long fileSize;
+ private long fileNum;
+ private boolean isNative = true;
+ private List<NSparkTableMeta.SparkTableColumnMeta> allColumns =
Lists.newArrayList();
+ private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns =
Lists.newArrayList();
+ private boolean isTransactional = false;
+ private boolean isRangePartition = false;
+ private String s3Role;
+ private String s3Endpoint;
+
+ public NSparkTableMetaBuilder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setSdLocation(String sdLocation) {
+ this.sdLocation = sdLocation;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) {
+ this.sdInputFormat = sdInputFormat;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) {
+ this.sdOutputFormat = sdOutputFormat;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setOwner(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setProvider(String provider) {
+ this.provider = provider;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setTableType(String tableType) {
+ this.tableType = tableType;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setCreateTime(String createTime) {
+ this.createTime = createTime;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) {
+ this.lastAccessTime = lastAccessTime;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setFileNum(long fileNum) {
+ this.fileNum = fileNum;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setIsNative(boolean isNative) {
+ this.isNative = isNative;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder
setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) {
+ this.allColumns = allColumns;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder
setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta>
partitionColumns) {
+ this.partitionColumns = partitionColumns;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) {
+ this.isTransactional = isTransactional;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setIsRangePartition(boolean
isRangePartition) {
+ this.isRangePartition = isRangePartition;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setS3Role(String s3Role) {
+ this.s3Role = s3Role;
+ return this;
+ }
+
+ public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) {
+ this.s3Endpoint = s3Endpoint;
+ return this;
+ }
+
+ public NSparkTableMeta createSparkTableMeta() {
+ return new NSparkTableMeta(tableName, sdLocation, sdInputFormat,
sdOutputFormat, owner, provider, tableType,
+ createTime, lastAccessTime, fileSize, fileNum, isNative,
allColumns, partitionColumns, isTransactional,
+ isRangePartition, s3Role, s3Endpoint);
+ }
+}
\ No newline at end of file
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
new file mode 100644
index 0000000000..033edb84f9
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.jnet.Installer;
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import scala.Option;
+import scala.collection.JavaConversions;
+
+public class NSparkTableMetaExplorer implements Serializable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(NSparkTableMetaExplorer.class);
+
+ enum PROVIDER {
+ HIVE("hive"), UNSPECIFIED("");
+
+ private static final PROVIDER[] ALL = new PROVIDER[] { HIVE };
+ private String value;
+
+ PROVIDER(String value) {
+ this.value = value;
+ }
+
+ public static PROVIDER fromString(Option<String> value) {
+ if (value.isEmpty()) {
+ return UNSPECIFIED;
+ }
+
+ for (PROVIDER provider : ALL) {
+ if (provider.value.equals(value.get())) {
+ return provider;
+ }
+ }
+ return UNSPECIFIED;
+ }
+ }
+
+ private static final List<String> UNSUPOORT_TYPE =
Lists.newArrayList("array", "map", "struct", "binary");
+ private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY =
"__CHAR_VARCHAR_TYPE_STRING";
+ public static final String S3_ROLE_PROPERTY_KEY = "role";
+ public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint";
+
+ public NSparkTableMeta getSparkTableMeta(String database, String
tableName) {
+ SessionCatalog catalog =
SparderEnv.getSparkSession().sessionState().catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
+ Option.apply(database.isEmpty() ? null : database));
+ CatalogTable tableMetadata =
catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+ checkTableIsValid(tableMetadata, tableIdentifier, tableName);
+ return getSparkTableMeta(tableName, tableMetadata);
+ }
+
+ public Set<String> checkAndGetTablePartitions(String database, String
tableName, String partitionCol) {
+ SessionCatalog catalog =
SparderEnv.getSparkSession().sessionState().catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
+ Option.apply(database.isEmpty() ? null : database));
+
+ CatalogTable tableMetadata =
catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+
+ String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ?
null
+ :
tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT);
+
+ if (!partitionCol.equalsIgnoreCase(firstPartCol)) {
+ throw new IllegalArgumentException(
+ String.format(Locale.ROOT, "table partition col %s not
match col %s", firstPartCol, partitionCol));
+ }
+ return
JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier,
Option.empty())).stream()
+ .map(item ->
JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream()
+ .filter(entry ->
partitionCol.equalsIgnoreCase(entry.getKey())) //
+ .findFirst() //
+ .map(Map.Entry::getValue) //
+ .orElse(null))
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+ }
+
+ private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable
tableMetadata) {
+ NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder();
+ builder.setTableName(tableName);
+ builder.setAllColumns(getColumns(tableMetadata,
tableMetadata.schema()));
+ builder.setOwner(tableMetadata.owner());
+ builder.setCreateTime(tableMetadata.createTime() + "");
+ builder.setLastAccessTime(tableMetadata.lastAccessTime() + "");
+ builder.setTableType(tableMetadata.tableType().name());
+ builder.setPartitionColumns(getColumns(tableMetadata,
tableMetadata.partitionSchema()));
+ builder.setIsRangePartition(isRangePartition(tableMetadata));
+ if (tableMetadata.storage().inputFormat().isDefined()) {
+
builder.setSdInputFormat(tableMetadata.storage().inputFormat().get());
+ }
+ if (tableMetadata.storage().outputFormat().isDefined()) {
+
builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get());
+ }
+ Option<URI> uriOption = tableMetadata.storage().locationUri();
+ if (uriOption.isDefined()) {
+ builder.setSdLocation(uriOption.get().toString());
+ }
+ if (tableMetadata.provider().isDefined()) {
+ builder.setProvider(tableMetadata.provider().get());
+ }
+ if (tableMetadata.properties().contains("totalSize")) {
+
builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get()));
+ }
+ if (tableMetadata.properties().contains("numFiles")) {
+
builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get()));
+ }
+ if (tableMetadata.properties().contains("transactional")) {
+
builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get()));
+ }
+ if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) {
+
builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get());
+ }
+
+ if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) {
+
builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get());
+ }
+ return builder.createSparkTableMeta();
+ }
+
+ private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable
tableMetadata, StructType schema) {
+ return getColumns(tableMetadata, schema, true);
+ }
+
+ private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable
tableMetadata, StructType schema,
+ boolean isCheckRepeatColumn) {
+ List<NSparkTableMeta.SparkTableColumnMeta> allColumns =
Lists.newArrayListWithCapacity(schema.size());
+ Set<String> columnCacheTemp = Sets.newHashSet();
+ for (org.apache.spark.sql.types.StructField field : schema.fields()) {
+ String type = field.dataType().simpleString();
+ if
(field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) {
+ type =
field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY);
+ }
+ String finalType = type;
+ if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) {
+ logger.info("Load table {} ignore column {}:{}",
tableMetadata.identifier().identifier(), field.name(),
+ finalType);
+ continue;
+ }
+ if (isCheckRepeatColumn && columnCacheTemp.contains(field.name()))
{
+ logger.info("The【{}】column is already included and does not
need to be added again", field.name());
+ continue;
+ }
+ columnCacheTemp.add(field.name());
+ allColumns.add(new
NSparkTableMeta.SparkTableColumnMeta(field.name(), type,
+ field.getComment().isDefined() ? field.getComment().get()
: null));
+ }
+
+ return allColumns;
+ }
+
+ private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier
tableIdentifier, String tableName) {
+ if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) {
+ try {
+ Installer.setURLStreamHandlerFactory(new
FsUrlStreamHandlerFactory());
+
SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed();
+ } catch (Throwable e) {
+ logger.error("Error for parser view: " + tableName, e);
+ throw new RuntimeException("Error for parser view: " +
tableName + ", " + e.getMessage()
+ + "(There are maybe syntactic differences between HIVE
and SparkSQL)", e);
+ }
+ }
+ }
+
+ private Boolean isRangePartition(CatalogTable tableMetadata) {
+ List<NSparkTableMeta.SparkTableColumnMeta> allColumns =
getColumns(tableMetadata, tableMetadata.schema(),
+ false);
+ return allColumns.stream().collect(Collectors.groupingBy(p ->
p.name)).values().stream()
+ .anyMatch(p -> p.size() > 1);
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
new file mode 100644
index 0000000000..669f418c9c
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.kylin.source.IReadableTable.TableReader;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+
+public class NSparkTableReader implements TableReader {
+ private String dbName;
+ private String tableName;
+ private SparkSession ss;
+ private List<Row> records;
+ private Iterator<Row> iterator;
+ private Row currentRow;
+
+ public NSparkTableReader(String dbName, String tableName) {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ initialize();
+ }
+
+ public static String[] getRowAsStringArray(Row record) {
+ StructField[] fields = record.schema().fields();
+ String[] arr = new String[fields.length];
+ for (int i = 0; i < arr.length; i++) {
+ Object o = record.get(i);
+ arr[i] = (o == null) ? null : o.toString();
+ }
+ return arr;
+ }
+
+ private void initialize() {
+ ss = SparderEnv.getSparkSession();
+ String master = ss.sparkContext().master();
+ String tableIdentity = tableName;
+ // spark sql can not add the database prefix when create tempView from
csv, but when working with hive, it need the database prefix
+ if (!master.toLowerCase(Locale.ROOT).contains("local")) {
+ tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName,
tableName);
+ }
+ records = SparkSqlUtil.queryAll(ss, tableIdentity);
+ iterator = records.iterator();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ boolean hasNext = iterator != null && iterator.hasNext();
+ if (hasNext) {
+ currentRow = iterator.next();
+ }
+ return hasNext;
+ }
+
+ @Override
+ public String[] getRow() {
+ return getRowAsStringArray(currentRow);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.records = null;
+ this.iterator = null;
+ this.currentRow = null;
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
new file mode 100644
index 0000000000..cea86bc9fa
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.engine.spark.source;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+
+import com.google.common.collect.Sets;
+
+import lombok.val;
+
+public class SparkSqlUtil {
+ public static Dataset<Row> query(SparkSession ss, String sql) {
+ return ss.sql(sql);
+ }
+
+ public static List<Row> queryForList(SparkSession ss, String sql) {
+ return ss.sql(sql).collectAsList();
+ }
+
+ public static List<Row> queryAll(SparkSession ss, String table) {
+ String sql = String.format(Locale.ROOT, "select * from %s", table);
+ return queryForList(ss, sql);
+ }
+
+ public static Set<String> getViewOrignalTables(String viewName,
SparkSession spark) throws AnalysisException {
+ String viewText = spark.sql("desc formatted " +
viewName).where("col_name = 'View Text'").head().getString(1);
+ val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText);
+ Set<String> viewTables = Sets.newHashSet();
+ for (Object l :
scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava())
{
+ if (l instanceof UnresolvedRelation) {
+ val tableName = ((UnresolvedRelation) l).tableName();
+ //if nested view
+ if
(spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name()))
{
+ viewTables.addAll(getViewOrignalTables(tableName, spark));
+ } else {
+ viewTables.add(tableName);
+ }
+ }
+ }
+ return viewTables;
+ }
+}