This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 555358303a KYLIN-5217 Create a initial commit 555358303a is described below commit 555358303a1a685ce3b6ca970f7a9d5f5029eb50 Author: biaobiao.sun <1319027...@qq.com> AuthorDate: Tue Aug 9 10:36:12 2022 +0800 KYLIN-5217 Create a initial commit --- .../kap/engine/spark/job/EnviromentAdaptor.java | 29 + .../kap/engine/spark/job/IJobProgressReport.java | 37 ++ .../kap/engine/spark/job/ISparkJobHandler.java | 47 ++ .../kap/engine/spark/job/ParamsConstants.java | 29 + .../kap/engine/spark/job/SparkAppDescription.java | 53 ++ .../engine/spark/application/SparkApplication.java | 620 +++++++++++++++++++++ .../kylin/engine/spark/application/SparkEntry.java | 27 + .../spark/source/NSparkCubingSourceInput.java | 77 +++ .../engine/spark/source/NSparkDataSource.java | 79 +++ .../spark/source/NSparkMetadataExplorer.java | 313 +++++++++++ .../kylin/engine/spark/source/NSparkTable.java | 65 +++ .../kylin/engine/spark/source/NSparkTableMeta.java | 111 ++++ .../spark/source/NSparkTableMetaBuilder.java | 140 +++++ .../spark/source/NSparkTableMetaExplorer.java | 203 +++++++ .../engine/spark/source/NSparkTableReader.java | 87 +++ .../kylin/engine/spark/source/SparkSqlUtil.java | 66 +++ 16 files changed, 1983 insertions(+) diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java new file mode 100644 index 0000000000..0301d20c91 --- /dev/null +++ b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.engine.spark.job; + +import java.util.Map; + +import org.apache.spark.sql.SparkSession; + +public interface EnviromentAdaptor { + + + Boolean prepareEnviroment(SparkSession spark, Map<String, String> params); +} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java new file mode 100644 index 0000000000..9e5e58f20f --- /dev/null +++ b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.engine.spark.job; + +import java.util.Map; + +public interface IJobProgressReport { + + boolean updateSparkJobInfo(Map<String, String> params, String url, String json); + + boolean updateSparkJobExtraInfo(Map<String, String> params, String url, String project, String jobId, + Map<String, String> extraInfo); + + default boolean executeFinish(Map<String, String> params, String project, String jobId) { + return true; + } + + default void initArgsParams(Map<String, String> argsParams) { + } + +} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java new file mode 100644 index 0000000000..c167b52597 --- /dev/null +++ b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.engine.spark.job; + +import java.util.Map; +import java.util.Properties; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.exception.ExecuteException; + +public interface ISparkJobHandler { + + void killOrphanApplicationIfExists(String project, String jobStepId, KylinConfig config, + Map<String, String> sparkConf); + + void checkApplicationJar(KylinConfig config) throws ExecuteException; + + String createArgsFileOnRemoteFileSystem(KylinConfig config, String project, String jobId, + Map<String, String> params) throws ExecuteException; + + Object generateSparkCmd(KylinConfig config, SparkAppDescription desc); + + default void modifyDump(Properties props) { + } + + default void prepareEnviroment(String project, String jobStepId, Map<String, String> params) { + } + + Map<String, String> runSparkSubmit(Object cmd, String parentId) throws ExecuteException; + +} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java new file mode 100644 index 0000000000..e218d319c7 --- /dev/null +++ b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.engine.spark.job; + +public class ParamsConstants { + + private ParamsConstants() { + throw new IllegalStateException("Utility class"); + } + + public static final String TIME_OUT = "time_out"; + public static final String JOB_TMP_DIR = "job_tmp_dir"; +} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java new file mode 100644 index 0000000000..8f105a5172 --- /dev/null +++ b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.engine.spark.job; + +import java.util.Map; +import java.util.Set; + +import lombok.Data; + +@Data +public class SparkAppDescription { + + private String hadoopConfDir; + + private String kylinJobJar; + + private String appArgs; + + private String jobNamePrefix; + + private String project; + + private String jobId; + + private int stepId; + + private String sparkSubmitClassName; + + private Map<String, String> sparkConf; + + private Set<String> sparkJars; + + private Set<String> sparkFiles; + + private String comma; + +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java new file mode 100644 index 0000000000..cac224d44b --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.application; + +import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE; +import static org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.cluster.IClusterManager; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.metadata.MetadataStore; +import org.apache.kylin.common.util.Application; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.TimeZoneUtils; +import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.engine.spark.job.BuildJobInfos; +import org.apache.kylin.engine.spark.job.KylinBuildEnv; +import org.apache.kylin.engine.spark.job.LogJobInfoUtils; +import org.apache.kylin.engine.spark.job.NSparkCubingUtil; +import org.apache.kylin.engine.spark.job.ResourceDetect; +import org.apache.kylin.engine.spark.job.RestfulJobProgressReport; +import org.apache.kylin.engine.spark.job.SegmentBuildJob; +import org.apache.kylin.engine.spark.job.SparkJobConstants; +import org.apache.kylin.engine.spark.job.UdfManager; +import org.apache.kylin.engine.spark.scheduler.ClusterMonitor; +import org.apache.kylin.engine.spark.utils.JobMetricsUtils; +import org.apache.kylin.engine.spark.utils.SparkConfHelper; +import org.apache.kylin.metadata.cube.model.NBatchConstants; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.query.pushdown.SparkSubmitter; +import org.apache.kylin.query.util.PushDownUtil; +import org.apache.spark.SparkConf; +import org.apache.spark.application.NoRetryException; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.sql.KylinSession; +import org.apache.spark.sql.KylinSession$; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SparkSessionExtensions; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.rules.Rule; +import org.apache.spark.sql.execution.datasource.AlignmentTableStats; +import org.apache.spark.sql.hive.utils.ResourceDetectUtils; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import io.kyligence.kap.engine.spark.job.EnviromentAdaptor; +import io.kyligence.kap.engine.spark.job.IJobProgressReport; +import io.kyligence.kap.engine.spark.job.ParamsConstants; +import lombok.val; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +public abstract class SparkApplication implements Application { + private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class); + private Map<String, String> params = Maps.newHashMap(); + public static final String JOB_NAME_PREFIX = "job_step_"; + private IJobProgressReport report; + + protected volatile KylinConfig config; + protected volatile String jobId; + protected String project; + protected int layoutSize = -1; + protected BuildJobInfos infos; + /** + * path for spark app args on HDFS + */ + protected String path; + + private ClusterMonitor clusterMonitor; + private final AtomicLong atomicDisconnectSparkMasterTimes = new AtomicLong(0); + private final AtomicBoolean atomicUnreachableSparkMaster = new AtomicBoolean(false); + private final AtomicReference<SparkConf> atomicSparkConf = new AtomicReference<>(null); + private final AtomicReference<SparkSession> atomicSparkSession = new AtomicReference<>(null); + private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new AtomicReference<>(null); + + public void execute(String[] args) { + try { + path = args[0]; + String argsLine = readArgsFromHDFS(); + params = JsonUtil.readValueAsMap(argsLine); + logger.info("Execute {} with args : {}", this.getClass().getName(), argsLine); + execute(); + } catch (Exception e) { + throw new RuntimeException("Error execute " + this.getClass().getName(), e); + } + } + + public AtomicBoolean getAtomicUnreachableSparkMaster() { + return atomicUnreachableSparkMaster; + } + + public final Map<String, String> getParams() { + return this.params; + } + + public final String getParam(String key) { + return this.params.get(key); + } + + public final void setParam(String key, String value) { + this.params.put(key, value); + } + + public final boolean contains(String key) { + return params.containsKey(key); + } + + public String getJobId() { + return jobId; + } + + public String getProject() { + return project; + } + + public KylinConfig getConfig() { + return config; + } + + public IJobProgressReport getReport() { + if (report == null) + return new RestfulJobProgressReport(); + return report; + } + + /// backwards compatibility, must have been initialized before invoking #doExecute. + protected SparkSession ss; + + public SparkSession getSparkSession() throws NoRetryException { + SparkSession sparkSession = atomicSparkSession.get(); + if (Objects.isNull(sparkSession)) { + // shouldn't reach here + throw new NoRetryException("spark session shouldn't be null"); + } + return sparkSession; + } + + public String readArgsFromHDFS() { + val fs = HadoopUtil.getFileSystem(path); + String argsLine = null; + Path filePath = new Path(path); + try (FSDataInputStream inputStream = fs.open(filePath)) { + BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset())); + argsLine = br.readLine(); + } catch (IOException e) { + logger.error("Error occurred when reading args file: {}", path, e); + } + return argsLine; + } + + /** + * get tracking url by application id + * + * @param sparkSession build sparkSession + * @return + */ + public String getTrackingUrl(IClusterManager clusterManager, SparkSession sparkSession) { + return clusterManager.getBuildTrackingUrl(sparkSession); + } + + private String tryReplaceHostAddress(String url) { + String originHost = null; + try { + URI uri = URI.create(url); + originHost = uri.getHost(); + String hostAddress = InetAddress.getByName(originHost).getHostAddress(); + return url.replace(originHost, hostAddress); + } catch (UnknownHostException uhe) { + logger.error("failed to get the ip address of {}, step back to use the origin tracking url.", originHost, + uhe); + return url; + } + } + + private Map<String, String> getTrackingInfo(SparkSession sparkSession, boolean ipAddressPreferred) { + IClusterManager clusterManager = atomicBuildEnv.get().clusterManager(); + String applicationId = sparkSession.sparkContext().applicationId(); + Map<String, String> extraInfo = new HashMap<>(); + extraInfo.put("yarn_app_id", applicationId); + try { + String trackingUrl = getTrackingUrl(clusterManager, sparkSession); + if (StringUtils.isBlank(trackingUrl)) { + logger.warn("Get tracking url of application {}, but empty url found.", applicationId); + return extraInfo; + } + if (ipAddressPreferred) { + trackingUrl = tryReplaceHostAddress(trackingUrl); + } + extraInfo.put("yarn_app_url", trackingUrl); + } catch (Exception e) { + logger.error("get tracking url failed!", e); + } + return extraInfo; + } + + protected void exchangeSparkSession() { + exchangeSparkSession(atomicSparkConf.get()); + } + + protected final void execute() throws Exception { + String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL); + jobId = getParam(NBatchConstants.P_JOB_ID); + project = getParam(NBatchConstants.P_PROJECT_NAME); + if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) { + layoutSize = StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length; + } + try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig + .setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl))) { + config = autoCloseConfig.get(); + report = (IJobProgressReport) ClassUtil.newInstance(config.getBuildJobProgressReporter()); + report.initArgsParams(getParams()); + //// KylinBuildEnv + final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config); + atomicBuildEnv.set(buildEnv); + infos = buildEnv.buildJobInfos(); + infos.recordJobId(jobId); + infos.recordProject(project); + infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId)); + + monitorSparkMaster(); + + HadoopUtil.setCurrentConfiguration(new Configuration()); + //////// + exchangeSparkConf(buildEnv.sparkConf()); + + TimeZoneUtils.setDefaultTimeZone(config); + + /// wait until resource is enough + waiteForResource(atomicSparkConf.get(), buildEnv); + + /// + logger.info("Prepare job environment"); + prepareSparkSession(); + + /// backwards compatibility + ss = getSparkSession(); + val master = ss.conf().get(SparkLauncher.SPARK_MASTER, ""); + if (!master.equals("local")) { + EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil + .newInstance(config.getBuildJobEnviromentAdaptor()); + adaptor.prepareEnviroment(ss, params); + } + + if (config.useDynamicS3RoleCredentialInTable()) { + val tableMetadataManager = NTableMetadataManager.getInstance(config, project); + tableMetadataManager.listAllTables().forEach(tableDesc -> SparderEnv + .addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc), ss)); + } + + if (!config.isUTEnv()) { + Unsafe.setProperty("kylin.env", config.getDeployEnv()); + } + logger.info("Start job"); + infos.startJob(); + // should be invoked after method prepareSparkSession + extraInit(); + + waiteForResourceSuccess(); + doExecute(); + // Output metadata to another folder + val resourceStore = ResourceStore.getKylinMetaStore(config); + val outputConfig = KylinConfig.createKylinConfig(config); + outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL)); + MetadataStore.createMetadataStore(outputConfig).dump(resourceStore); + } catch (Exception e) { + handleException(e); + } finally { + if (infos != null) { + infos.jobEnd(); + } + destroySparkSession(); + extraDestroy(); + executeFinish(); + } + } + + protected void handleException(Exception e) throws Exception { + throw e; + } + + private SparkSession createSpark(SparkConf sparkConf) { + SparkSession.Builder sessionBuilder = SparkSession.builder() + .withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { + @Override + public BoxedUnit apply(SparkSessionExtensions v1) { + v1.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() { + @Override + public Rule<LogicalPlan> apply(SparkSession session) { + return new AlignmentTableStats(session); + } + }); + return BoxedUnit.UNIT; + } + }).enableHiveSupport().config(sparkConf) + .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"); + + // If this is UT and SparkSession is already created, then use SparkSession. + // Otherwise, we always use KylinSession + boolean createWithSparkSession = !isJobOnCluster(sparkConf) && SparderEnv.isSparkAvailable(); + if (createWithSparkSession) { + boolean isKylinSession = SparderEnv.getSparkSession() instanceof KylinSession; + createWithSparkSession = !isKylinSession; + } + + if (createWithSparkSession) { + return sessionBuilder.getOrCreate(); + } else { + return KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession(); + } + } + + public boolean isJobOnCluster(SparkConf conf) { + return !Utils.isLocalMaster(conf) && !config.isUTEnv(); + } + + protected void extraInit() { + } + + public void extraDestroy() { + if (clusterMonitor != null) { + clusterMonitor.shutdown(); + } + } + + protected abstract void doExecute() throws Exception; + + protected void onLayoutFinished(long layoutId) { + //do nothing + } + + protected void onExecuteFinished() { + //do nothing + } + + protected String calculateRequiredCores() throws Exception { + return SparkJobConstants.DEFAULT_REQUIRED_CORES; + } + + private void autoSetSparkConf(SparkConf sparkConf) throws Exception { + SparkConfHelper helper = new SparkConfHelper(); + // copy user defined spark conf + if (sparkConf.getAll() != null) { + Arrays.stream(sparkConf.getAll()).forEach(config -> helper.setConf(config._1, config._2)); + } + helper.setClusterManager(KylinBuildEnv.get().clusterManager()); + + chooseContentSize(helper); + + helper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(layoutSize)); + helper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores()); + helper.setConf(COUNT_DISTICT, hasCountDistinct().toString()); + helper.generateSparkConf(); + helper.applySparkConf(sparkConf); + } + + private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) throws Exception { + val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null); + infos.recordStageId(waiteForResource.getId()); + waiteForResource.execute(); + } + + protected void waiteForResourceSuccess() throws Exception { + val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null); + waiteForResource.onStageFinished(true); + infos.recordStageId(""); + } + + protected void executeFinish() { + try { + getReport().executeFinish(getReportParams(), project, getJobId()); + } catch (Exception e) { + logger.error("executeFinish failed", e); + } + } + + protected void chooseContentSize(SparkConfHelper helper) { + Path shareDir = config.getJobTmpShareDir(project, jobId); + // add content size with unit + helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, chooseContentSize(shareDir)); + } + + protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) { + return modelDesc.getAllTableRefs().stream().anyMatch(p -> p.getTableDesc().isRangePartition()); + } + + protected String chooseContentSize(Path shareDir) { + // return size with unit + return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b"; + } + + protected Boolean hasCountDistinct() throws IOException { + Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId), + ResourceDetectUtils.countDistinctSuffix()); + FileSystem fileSystem = countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration()); + Boolean exist; + if (fileSystem.exists(countDistinct)) { + exist = ResourceDetectUtils.readResourcePathsAs(countDistinct); + } else { + exist = false; + logger.debug("File count_distinct.json doesn't exist, set hasCountDistinct to false."); + } + logger.debug("Exist count distinct measure: {}", exist); + return exist; + } + + public void logJobInfo() { + try { + logger.info(generateInfo()); + if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) { + logger.info("skip record job wait and run time"); + return; + } + Map<String, String> extraInfo = new HashMap<>(); + extraInfo.put("yarn_job_wait_time", ((Long) KylinBuildEnv.get().buildJobInfos().waitTime()).toString()); + extraInfo.put("yarn_job_run_time", ((Long) KylinBuildEnv.get().buildJobInfos().buildTime()).toString()); + + getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/wait_and_run_time", project, jobId, + extraInfo); + } catch (Exception e) { + logger.warn("Error occurred when generate job info.", e); + } + } + + private Map<String, String> getReportParams() { + val reportParams = new HashMap<String, String>(); + reportParams.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout())); + reportParams.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)); + return reportParams; + } + + protected String generateInfo() { + return LogJobInfoUtils.sparkApplicationInfo(); + } + + public Set<String> getIgnoredSnapshotTables() { + return NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES)); + } + + protected Map<String, String> getSparkConfigOverride(KylinConfig config) { + return config.getSparkConfigOverride(); + } + + protected void checkDateFormatIfExist(String project, String modelId) throws Exception { + if (config.isUTEnv()) { + return; + } + val modelManager = NDataModelManager.getInstance(config, project); + NDataModel modelDesc = modelManager.getDataModelDesc(modelId); + if (checkRangePartitionTableIsExist(modelDesc)) { + logger.info("Range partitioned tables do not support pushdown, so do not need to perform subsequent logic"); + return; + } + + val partitionDesc = modelDesc.getPartitionDesc(); + if (PartitionDesc.isEmptyPartitionDesc(partitionDesc) + || org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat())) + return; + + if (CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType())) + return; + + String partitionColumn = modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB(); + + SparkSession sparkSession = atomicSparkSession.get(); + try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance() + .overrideSparkSession(sparkSession)) { + String dateString = PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(), partitionColumn, + project); + val sdf = new SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(), + Locale.getDefault(Locale.Category.FORMAT)); + val date = sdf.parse(dateString); + if (date == null || !dateString.equals(sdf.format(date))) { + throw new NoRetryException("date format not match"); + } + } catch (KylinException ignore) { + // ignore it when pushdown return empty row + } catch (ParseException | NoRetryException e) { + throw new NoRetryException("date format not match"); + } + } + + private void exchangeSparkConf(SparkConf sparkConf) throws Exception { + if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) { + Map<String, String> baseSparkConf = getSparkConfigOverride(config); + if (!baseSparkConf.isEmpty()) { + baseSparkConf.forEach(sparkConf::set); + String baseSparkConfStr = JsonUtil.writeValueAsString(baseSparkConf); + logger.info("Override user-defined spark conf: {}", baseSparkConfStr); + } + if (config.isAutoSetSparkConf()) { + logger.info("Set spark conf automatically."); + try { + autoSetSparkConf(sparkConf); + } catch (Exception e) { + logger.warn("Auto set spark conf failed. Load spark conf from system properties", e); + } + } + } + + atomicSparkConf.set(sparkConf); + } + + private void exchangeSparkSession(SparkConf sparkConf) { + SparkSession sparkSession = atomicSparkSession.get(); + if (Objects.nonNull(sparkSession)) { + // destroy previous spark session + destroySparkSession(); + } + + sparkSession = createSpark(sparkConf); + if (!config.isUTEnv() && !sparkConf.get("spark.master").startsWith("k8s")) { + getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/spark", project, jobId, + getTrackingInfo(sparkSession, config.isTrackingUrlIpAddressEnabled())); + } + + // for spark metrics + JobMetricsUtils.registerListener(sparkSession); + SparderEnv.registerListener(sparkSession.sparkContext()); + + //#8341 + SparderEnv.setSparkSession(sparkSession); + UdfManager.create(sparkSession); + + /// + atomicSparkSession.set(sparkSession); + } + + private void prepareSparkSession() throws NoRetryException { + SparkConf sparkConf = atomicSparkConf.get(); + if (Objects.isNull(sparkConf)) { + // shouldn't reach here + throw new NoRetryException("spark conf shouldn't be null"); + } + + /// SegmentBuildJob only!!! + if (config.isSnapshotSpecifiedSparkConf() && (this instanceof SegmentBuildJob)) { + // snapshot specified spark conf, based on the exchanged spark conf. + SparkConf clonedSparkConf = sparkConf.clone(); + Map<String, String> snapshotSparkConf = config.getSnapshotBuildingConfigOverride(); + snapshotSparkConf.forEach(clonedSparkConf::set); + logger.info("exchange sparkSession using snapshot specified sparkConf"); + exchangeSparkSession(clonedSparkConf); + return; + } + // normal logic + exchangeSparkSession(sparkConf); + } + + private void destroySparkSession() { + SparkSession sparkSession = atomicSparkSession.get(); + if (Objects.isNull(sparkSession)) { + logger.info("no initialized sparkSession instance"); + return; + } + if (sparkSession.conf().get("spark.master").startsWith("local")) { + // for UT use? but very strange for resource detect mode (spark local). + return; + } + JobMetricsUtils.unRegisterListener(sparkSession); + sparkSession.stop(); + } + + private void monitorSparkMaster() { + clusterMonitor = new ClusterMonitor(); + clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession, atomicDisconnectSparkMasterTimes, + atomicUnreachableSparkMaster); + } + +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java new file mode 100644 index 0000000000..31974f65bc --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.application; + +import org.apache.spark.application.JobWorkSpace; + +public class SparkEntry { + public static void main(String[] args) { + JobWorkSpace.execute(args); + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java new file mode 100644 index 0000000000..5411c31bbd --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.engine.spark.NSparkCubingEngine; +import org.apache.kylin.engine.spark.job.KylinBuildEnv; +import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.SparderTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingSource { + private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class); + + @Override + public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) { + ColumnDesc[] columnDescs = table.getColumns(); + List<String> tblColNames = Lists.newArrayListWithCapacity(columnDescs.length); + StructType kylinSchema = new StructType(); + for (ColumnDesc columnDesc : columnDescs) { + if (!columnDesc.isComputedColumn()) { + kylinSchema = kylinSchema.add(columnDesc.getName(), + SparderTypeUtil.toSparkType(columnDesc.getType(), false), true); + tblColNames.add("`" + columnDesc.getName() + "`"); + } + } + String[] colNames = tblColNames.toArray(new String[0]); + String colString = Joiner.on(",").join(colNames); + String sql; + KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig(); + logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}", + table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled()); + if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(), + kylinConfig.isReadTransactionalTableEnabled())) { + sql = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString, + KylinBuildEnv.get()); + } else { + sql = String.format(Locale.ROOT, "select %s from %s", colString, table.getIdentity()); + } + Dataset<Row> df = ss.sql(sql); + StructType sparkSchema = df.schema(); + logger.debug("Source data sql is: {}", sql); + logger.debug("Kylin schema: {}", kylinSchema.treeString()); + return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema)); + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java new file mode 100644 index 0000000000..6a501bc71e --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.source; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.engine.spark.NSparkCubingEngine; + +public class NSparkDataSource implements ISource { + // for reflection + public NSparkDataSource(KylinConfig config) { + + } + + @Override + public ISourceMetadataExplorer getSourceMetadataExplorer() { + return new NSparkMetadataExplorer(); + } + + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) { + return (I) new NSparkCubingSourceInput(); + } else { + throw new IllegalArgumentException("Unsupported engine interface: " + engineInterface); + } + } + + @Override + public IReadableTable createReadableTable(TableDesc tableDesc) { + return new NSparkTable(tableDesc); + } + + @Override + public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable, SegmentRange srcPartition) { + return srcPartition; + } + + @Override + public ISampleDataDeployer getSampleDataDeployer() { + return new NSparkMetadataExplorer(); + } + + @Override + public SegmentRange getSegmentRange(String start, String end) { + start = StringUtils.isEmpty(start) ? "0" : start; + end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end; + return new SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start), Long.parseLong(end)); + } + + @Override + public boolean supportBuildSnapShotByPartition() { + return true; + } + +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java new file mode 100644 index 0000000000..7e4f7deea0 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kylin.common.KapConfig; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalog.Database; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; +import org.apache.spark.sql.internal.SQLConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.clearspring.analytics.util.Lists; +import com.google.common.collect.Sets; + +import lombok.val; + +public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class); + + public static String generateCreateSchemaSql(String schemaName) { + return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName); + } + + public static String[] generateCreateTableSql(TableDesc tableDesc) { + String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); + + StringBuilder ddl = new StringBuilder(); + ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); + ddl.append("(" + "\n"); + + for (int i = 0; i < tableDesc.getColumns().length; i++) { + ColumnDesc col = tableDesc.getColumns()[i]; + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + col.getDatatype() + "\n"); + } + + ddl.append(")" + "\n"); + ddl.append("USING com.databricks.spark.csv"); + + return new String[] { dropSql, ddl.toString() }; + } + + public NSparkTableMetaExplorer getTableMetaExplorer() { + return new NSparkTableMetaExplorer(); + } + + @Override + public List<String> listDatabases() throws Exception { + Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace"); + return dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList()); + } + + @Override + public List<String> listTables(String database) throws Exception { + val ugi = UserGroupInformation.getCurrentUser(); + val config = KylinConfig.getInstanceFromEnv(); + val spark = SparderEnv.getSparkSession(); + + List<String> tables = Lists.newArrayList(); + try { + String sql = "show tables"; + if (StringUtils.isNotBlank(database)) { + sql = String.format(Locale.ROOT, sql + " in %s", database); + } + Dataset<Row> dataset = SparderEnv.getSparkSession().sql(sql).select("tableName"); + tables = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList()); + + if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable() + && UserGroupInformation.isSecurityEnabled()) { + List<String> accessTables = Lists.newArrayList(); + for (String table : tables) { + val tableName = database + "." + table; + if (checkTableAccess(tableName)) { + accessTables.add(table); + } + } + return accessTables; + } + } catch (Exception e) { + logger.error("List hive tables failed. user: {}, db: {}", ugi.getUserName(), database); + } + + return tables; + } + + public boolean checkTableAccess(String tableName) { + boolean isAccess = true; + try { + val spark = SparderEnv.getSparkSession(); + val sparkTable = spark.catalog().getTable(tableName); + Set<String> needCheckTables = Sets.newHashSet(); + if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) { + needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, SparderEnv.getSparkSession()); + } else { + needCheckTables.add(tableName); + } + String hiveSpecFsLocation = spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION()); + FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem() + : HadoopUtil.getFileSystem(hiveSpecFsLocation); + for (String table : needCheckTables) { + fs.listStatus(new Path(getLoc(spark, table, hiveSpecFsLocation))); + } + } catch (Exception e) { + isAccess = false; + try { + logger.error("Read hive table {} error:{}, ugi name: {}.", tableName, e.getMessage(), + UserGroupInformation.getCurrentUser().getUserName()); + } catch (IOException ex) { + logger.error("fetch user curr ugi info error.", e); + } + } + return isAccess; + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj) + throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + NTableMetadataManager metaMgr = NTableMetadataManager.getInstance(config, prj); + + NSparkTableMeta tableMeta = getTableMetaExplorer().getSparkTableMeta(database, tableName); + TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName); + + // make a new TableDesc instance, don't modify the one in use + if (tableDesc == null) { + tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase(Locale.ROOT)); + tableDesc.setName(tableName.toUpperCase(Locale.ROOT)); + tableDesc.setUuid(RandomUtil.randomUUIDStr()); + tableDesc.setLastModified(0); + } else { + tableDesc = new TableDesc(tableDesc); + } + + if (tableMeta.tableType != null) { + tableDesc.setTableType(tableMeta.tableType); + } + //set table type = spark + tableDesc.setSourceType(ISourceAware.ID_SPARK); + tableDesc.setTransactional(tableMeta.isTransactional); + tableDesc.setRangePartition(tableMeta.isRangePartition); + + Set<String> partColumnSet = Optional.ofNullable(tableMeta.partitionColumns) // + .orElseGet(Collections::emptyList).stream().map(field -> field.name) // + .collect(Collectors.toSet()); + int columnNumber = tableMeta.allColumns.size(); + List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber); + for (int i = 0; i < columnNumber; i++) { + NSparkTableMeta.SparkTableColumnMeta field = tableMeta.allColumns.get(i); + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(field.name.toUpperCase(Locale.ROOT)); + cdesc.setCaseSensitiveName(field.name); + // use "double" in kylin for "float" + if ("float".equalsIgnoreCase(field.dataType)) { + cdesc.setDatatype("double"); + } else { + cdesc.setDatatype(field.dataType); + } + cdesc.setId(String.valueOf(i + 1)); + cdesc.setComment(field.comment); + cdesc.setPartitioned(partColumnSet.contains(field.name)); + columns.add(cdesc); + } + tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); + List<String> partCols = tableMeta.partitionColumns.stream().map(col -> col.name).collect(Collectors.toList()); + if (!partCols.isEmpty()) { + tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT)); + } else { + tableDesc.setPartitionColumn(null); + } + StringBuilder partitionColumnBuilder = new StringBuilder(); + for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) { + if (i > 0) + partitionColumnBuilder.append(", "); + partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT)); + } + + TableExtDesc tableExtDesc = new TableExtDesc(); + tableExtDesc.setIdentity(tableDesc.getIdentity()); + tableExtDesc.setUuid(RandomUtil.randomUUIDStr()); + tableExtDesc.setLastModified(0); + tableExtDesc.init(prj); + + tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY, tableMeta.sdLocation); + tableExtDesc.addDataSourceProp("owner", tableMeta.owner); + tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime); + tableExtDesc.addDataSourceProp("last_access_time", tableMeta.lastAccessTime); + tableExtDesc.addDataSourceProp("partition_column", partitionColumnBuilder.toString()); + tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(tableMeta.fileSize)); + tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(tableMeta.fileNum)); + tableExtDesc.addDataSourceProp("hive_inputFormat", tableMeta.sdInputFormat); + tableExtDesc.addDataSourceProp("hive_outputFormat", tableMeta.sdOutputFormat); + tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY, tableMeta.s3Role); + tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY, tableMeta.s3Endpoint); + return Pair.newPair(tableDesc, tableExtDesc); + } + + @Override + public List<String> getRelatedKylinResources(TableDesc table) { + return Collections.emptyList(); + } + + @Override + public boolean checkDatabaseAccess(String database) throws Exception { + boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable(); + if (hiveDBAccessFilterEnable) { + logger.info("Check database {} access start.", database); + try { + Database db = SparderEnv.getSparkSession().catalog().getDatabase(database); + } catch (AnalysisException e) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + logger.info("The current user: {} does not have permission to access database {}", ugi.getUserName(), + database); + return false; + } + } + + return true; + } + + @Override + public boolean checkTablesAccess(Set<String> tables) { + return tables.stream().allMatch(this::checkTableAccess); + } + + @Override + public Set<String> getTablePartitions(String database, String table, String prj, String partCol) { + return getTableMetaExplorer().checkAndGetTablePartitions(database, table, partCol); + } + + @Override + public void createSampleDatabase(String database) throws Exception { + SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database)); + } + + @Override + public void createSampleTable(TableDesc table) throws Exception { + String[] createTableSqls = generateCreateTableSql(table); + for (String sql : createTableSqls) { + SparderEnv.getSparkSession().sql(sql); + } + } + + @Override + public void loadSampleData(String tableName, String tableFileDir) throws Exception { + Dataset<Row> dataset = SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + ".csv").toDF(); + if (tableName.indexOf(".") > 0) { + tableName = tableName.substring(tableName.indexOf(".") + 1); + } + dataset.createOrReplaceTempView(tableName); + } + + @Override + public void createWrapperView(String origTableName, String viewName) throws Exception { + throw new UnsupportedOperationException("unsupport create wrapper view"); + } + + public String getLoc(SparkSession spark, String table, String hiveSpecFsLocation) { + String loc = spark.sql("desc formatted " + table).where("col_name == 'Location'").head().getString(1); + if (null == hiveSpecFsLocation || null == loc) { + return loc; + } + return loc.replace("hdfs://hacluster", hiveSpecFsLocation); + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java new file mode 100644 index 0000000000..6ab6643154 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.source; + +import java.io.IOException; +import java.util.Locale; + +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NSparkTable implements IReadableTable { + + private static final Logger logger = LoggerFactory.getLogger(NSparkTable.class); + + final private String database; + final private String tableName; + + public NSparkTable(TableDesc tableDesc) { + this.database = tableDesc.getDatabase(); + this.tableName = tableDesc.getName(); + } + + @Override + public TableReader getReader() throws IOException { + return new NSparkTableReader(database, tableName); + } + + @Override + public TableSignature getSignature() throws IOException { + // TODO: 07/12/2017 get modify time + String path = String.format(Locale.ROOT, "%s.%s", database, tableName); + long lastModified = System.currentTimeMillis(); // assume table is ever changing + int size = 0; + return new TableSignature(path, size, lastModified); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public String toString() { + return "spark: database=[" + database + "], table=[" + tableName + "]"; + } + +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java new file mode 100644 index 0000000000..29f48bfbfc --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import java.util.List; + +public class NSparkTableMeta { + public static class SparkTableColumnMeta { + String name; + String dataType; + String comment; + + public SparkTableColumnMeta(String name, String dataType, String comment) { + this.name = name; + this.dataType = dataType; + this.comment = comment; + } + + @Override + public String toString() { + return "SparkTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" + + comment + '\'' + '}'; + } + + public String getName() { + return name; + } + + public String getDataType() { + return dataType; + } + + public String getComment() { + return comment; + } + } + + String tableName; + String sdLocation;//sd is short for storage descriptor + String sdInputFormat; + String sdOutputFormat; + String owner; + String provider; + String tableType; + String createTime; + String lastAccessTime; + long fileSize; + long fileNum; + boolean isNative; + List<SparkTableColumnMeta> allColumns; + List<SparkTableColumnMeta> partitionColumns; + boolean isTransactional; + boolean isRangePartition; + String s3Role; + String s3Endpoint; + + public List<SparkTableColumnMeta> getAllColumns() { + return allColumns; + } + + public NSparkTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, + String owner, String provider, String tableType, String createTime, String lastAccessTime, long fileSize, + long fileNum, boolean isNative, List<SparkTableColumnMeta> allColumns, + List<SparkTableColumnMeta> partitionColumns, boolean isTransactional, boolean isRangePartition, + String s3Role, String s3Endpoint) { + this.tableName = tableName; + this.sdLocation = sdLocation; + this.sdInputFormat = sdInputFormat; + this.sdOutputFormat = sdOutputFormat; + this.owner = owner; + this.provider = provider; + this.tableType = tableType; + this.createTime = createTime; + this.lastAccessTime = lastAccessTime; + this.fileSize = fileSize; + this.fileNum = fileNum; + this.isNative = isNative; + this.allColumns = allColumns; + this.partitionColumns = partitionColumns; + this.isTransactional = isTransactional; + this.isRangePartition = isRangePartition; + this.s3Role = s3Role; + this.s3Endpoint = s3Endpoint; + } + + @Override + public String toString() { + return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + + ", owner='" + owner + ", provider='" + provider + '\'' + ", tableType='" + tableType + + ", createTime='" + createTime + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + + ", partitionColumns=" + partitionColumns + ", isTransactional=" + isTransactional + + ", isRangePartition=" + isRangePartition + ", s3Role=" + s3Role + ", s3Endpoint=" + s3Endpoint + '}'; + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java new file mode 100644 index 0000000000..2d4a9eeae1 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.source; + +import java.util.List; + +import com.google.common.collect.Lists; + +public class NSparkTableMetaBuilder { + private String tableName; + private String sdLocation; + private String sdInputFormat; + private String sdOutputFormat; + private String owner; + private String provider; + private String tableType; + private String createTime; + private String lastAccessTime; + private long fileSize; + private long fileNum; + private boolean isNative = true; + private List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayList(); + private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns = Lists.newArrayList(); + private boolean isTransactional = false; + private boolean isRangePartition = false; + private String s3Role; + private String s3Endpoint; + + public NSparkTableMetaBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public NSparkTableMetaBuilder setSdLocation(String sdLocation) { + this.sdLocation = sdLocation; + return this; + } + + public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) { + this.sdInputFormat = sdInputFormat; + return this; + } + + public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) { + this.sdOutputFormat = sdOutputFormat; + return this; + } + + public NSparkTableMetaBuilder setOwner(String owner) { + this.owner = owner; + return this; + } + + public NSparkTableMetaBuilder setProvider(String provider) { + this.provider = provider; + return this; + } + + public NSparkTableMetaBuilder setTableType(String tableType) { + this.tableType = tableType; + return this; + } + + public NSparkTableMetaBuilder setCreateTime(String createTime) { + this.createTime = createTime; + return this; + } + + public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) { + this.lastAccessTime = lastAccessTime; + return this; + } + + public NSparkTableMetaBuilder setFileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public NSparkTableMetaBuilder setFileNum(long fileNum) { + this.fileNum = fileNum; + return this; + } + + public NSparkTableMetaBuilder setIsNative(boolean isNative) { + this.isNative = isNative; + return this; + } + + public NSparkTableMetaBuilder setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) { + this.allColumns = allColumns; + return this; + } + + public NSparkTableMetaBuilder setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns) { + this.partitionColumns = partitionColumns; + return this; + } + + public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) { + this.isTransactional = isTransactional; + return this; + } + + public NSparkTableMetaBuilder setIsRangePartition(boolean isRangePartition) { + this.isRangePartition = isRangePartition; + return this; + } + + public NSparkTableMetaBuilder setS3Role(String s3Role) { + this.s3Role = s3Role; + return this; + } + + public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + return this; + } + + public NSparkTableMeta createSparkTableMeta() { + return new NSparkTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, provider, tableType, + createTime, lastAccessTime, fileSize, fileNum, isNative, allColumns, partitionColumns, isTransactional, + isRangePartition, s3Role, s3Endpoint); + } +} \ No newline at end of file diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java new file mode 100644 index 0000000000..033edb84f9 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.jnet.Installer; +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import scala.Option; +import scala.collection.JavaConversions; + +public class NSparkTableMetaExplorer implements Serializable { + + private static final Logger logger = LoggerFactory.getLogger(NSparkTableMetaExplorer.class); + + enum PROVIDER { + HIVE("hive"), UNSPECIFIED(""); + + private static final PROVIDER[] ALL = new PROVIDER[] { HIVE }; + private String value; + + PROVIDER(String value) { + this.value = value; + } + + public static PROVIDER fromString(Option<String> value) { + if (value.isEmpty()) { + return UNSPECIFIED; + } + + for (PROVIDER provider : ALL) { + if (provider.value.equals(value.get())) { + return provider; + } + } + return UNSPECIFIED; + } + } + + private static final List<String> UNSUPOORT_TYPE = Lists.newArrayList("array", "map", "struct", "binary"); + private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"; + public static final String S3_ROLE_PROPERTY_KEY = "role"; + public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint"; + + public NSparkTableMeta getSparkTableMeta(String database, String tableName) { + SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog(); + TableIdentifier tableIdentifier = TableIdentifier.apply(tableName, + Option.apply(database.isEmpty() ? null : database)); + CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); + checkTableIsValid(tableMetadata, tableIdentifier, tableName); + return getSparkTableMeta(tableName, tableMetadata); + } + + public Set<String> checkAndGetTablePartitions(String database, String tableName, String partitionCol) { + SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog(); + TableIdentifier tableIdentifier = TableIdentifier.apply(tableName, + Option.apply(database.isEmpty() ? null : database)); + + CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); + + String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ? null + : tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT); + + if (!partitionCol.equalsIgnoreCase(firstPartCol)) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "table partition col %s not match col %s", firstPartCol, partitionCol)); + } + return JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier, Option.empty())).stream() + .map(item -> JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream() + .filter(entry -> partitionCol.equalsIgnoreCase(entry.getKey())) // + .findFirst() // + .map(Map.Entry::getValue) // + .orElse(null)) + .filter(Objects::nonNull).collect(Collectors.toSet()); + } + + private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable tableMetadata) { + NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder(); + builder.setTableName(tableName); + builder.setAllColumns(getColumns(tableMetadata, tableMetadata.schema())); + builder.setOwner(tableMetadata.owner()); + builder.setCreateTime(tableMetadata.createTime() + ""); + builder.setLastAccessTime(tableMetadata.lastAccessTime() + ""); + builder.setTableType(tableMetadata.tableType().name()); + builder.setPartitionColumns(getColumns(tableMetadata, tableMetadata.partitionSchema())); + builder.setIsRangePartition(isRangePartition(tableMetadata)); + if (tableMetadata.storage().inputFormat().isDefined()) { + builder.setSdInputFormat(tableMetadata.storage().inputFormat().get()); + } + if (tableMetadata.storage().outputFormat().isDefined()) { + builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get()); + } + Option<URI> uriOption = tableMetadata.storage().locationUri(); + if (uriOption.isDefined()) { + builder.setSdLocation(uriOption.get().toString()); + } + if (tableMetadata.provider().isDefined()) { + builder.setProvider(tableMetadata.provider().get()); + } + if (tableMetadata.properties().contains("totalSize")) { + builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get())); + } + if (tableMetadata.properties().contains("numFiles")) { + builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get())); + } + if (tableMetadata.properties().contains("transactional")) { + builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get())); + } + if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) { + builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get()); + } + + if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) { + builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get()); + } + return builder.createSparkTableMeta(); + } + + private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema) { + return getColumns(tableMetadata, schema, true); + } + + private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema, + boolean isCheckRepeatColumn) { + List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayListWithCapacity(schema.size()); + Set<String> columnCacheTemp = Sets.newHashSet(); + for (org.apache.spark.sql.types.StructField field : schema.fields()) { + String type = field.dataType().simpleString(); + if (field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) { + type = field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY); + } + String finalType = type; + if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) { + logger.info("Load table {} ignore column {}:{}", tableMetadata.identifier().identifier(), field.name(), + finalType); + continue; + } + if (isCheckRepeatColumn && columnCacheTemp.contains(field.name())) { + logger.info("The【{}】column is already included and does not need to be added again", field.name()); + continue; + } + columnCacheTemp.add(field.name()); + allColumns.add(new NSparkTableMeta.SparkTableColumnMeta(field.name(), type, + field.getComment().isDefined() ? field.getComment().get() : null)); + } + + return allColumns; + } + + private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier tableIdentifier, String tableName) { + if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) { + try { + Installer.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); + SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed(); + } catch (Throwable e) { + logger.error("Error for parser view: " + tableName, e); + throw new RuntimeException("Error for parser view: " + tableName + ", " + e.getMessage() + + "(There are maybe syntactic differences between HIVE and SparkSQL)", e); + } + } + } + + private Boolean isRangePartition(CatalogTable tableMetadata) { + List<NSparkTableMeta.SparkTableColumnMeta> allColumns = getColumns(tableMetadata, tableMetadata.schema(), + false); + return allColumns.stream().collect(Collectors.groupingBy(p -> p.name)).values().stream() + .anyMatch(p -> p.size() > 1); + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java new file mode 100644 index 0000000000..669f418c9c --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; + +import org.apache.kylin.source.IReadableTable.TableReader; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; + +public class NSparkTableReader implements TableReader { + private String dbName; + private String tableName; + private SparkSession ss; + private List<Row> records; + private Iterator<Row> iterator; + private Row currentRow; + + public NSparkTableReader(String dbName, String tableName) { + this.dbName = dbName; + this.tableName = tableName; + initialize(); + } + + public static String[] getRowAsStringArray(Row record) { + StructField[] fields = record.schema().fields(); + String[] arr = new String[fields.length]; + for (int i = 0; i < arr.length; i++) { + Object o = record.get(i); + arr[i] = (o == null) ? null : o.toString(); + } + return arr; + } + + private void initialize() { + ss = SparderEnv.getSparkSession(); + String master = ss.sparkContext().master(); + String tableIdentity = tableName; + // spark sql can not add the database prefix when create tempView from csv, but when working with hive, it need the database prefix + if (!master.toLowerCase(Locale.ROOT).contains("local")) { + tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName, tableName); + } + records = SparkSqlUtil.queryAll(ss, tableIdentity); + iterator = records.iterator(); + } + + @Override + public boolean next() throws IOException { + boolean hasNext = iterator != null && iterator.hasNext(); + if (hasNext) { + currentRow = iterator.next(); + } + return hasNext; + } + + @Override + public String[] getRow() { + return getRowAsStringArray(currentRow); + } + + @Override + public void close() throws IOException { + this.records = null; + this.iterator = null; + this.currentRow = null; + } +} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java new file mode 100644 index 0000000000..cea86bc9fa --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark.source; + +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; + +import com.google.common.collect.Sets; + +import lombok.val; + +public class SparkSqlUtil { + public static Dataset<Row> query(SparkSession ss, String sql) { + return ss.sql(sql); + } + + public static List<Row> queryForList(SparkSession ss, String sql) { + return ss.sql(sql).collectAsList(); + } + + public static List<Row> queryAll(SparkSession ss, String table) { + String sql = String.format(Locale.ROOT, "select * from %s", table); + return queryForList(ss, sql); + } + + public static Set<String> getViewOrignalTables(String viewName, SparkSession spark) throws AnalysisException { + String viewText = spark.sql("desc formatted " + viewName).where("col_name = 'View Text'").head().getString(1); + val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText); + Set<String> viewTables = Sets.newHashSet(); + for (Object l : scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava()) { + if (l instanceof UnresolvedRelation) { + val tableName = ((UnresolvedRelation) l).tableName(); + //if nested view + if (spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name())) { + viewTables.addAll(getViewOrignalTables(tableName, spark)); + } else { + viewTables.add(tableName); + } + } + } + return viewTables; + } +}