This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 605b4f52ec KYLIN-5217 Create a initial commit 605b4f52ec is described below commit 605b4f52ecd607fb6c6943d8014c1a2736254f17 Author: sunbiaobiao <sunbiaobia...@gmail.com> AuthorDate: Tue Aug 9 10:47:02 2022 +0800 KYLIN-5217 Create a initial commit --- .../kap/engine/spark/job/EnviromentAdaptor.java | 29 - .../kap/engine/spark/job/IJobProgressReport.java | 37 -- .../kap/engine/spark/job/ISparkJobHandler.java | 47 -- .../kap/engine/spark/job/ParamsConstants.java | 29 - .../kap/engine/spark/job/SparkAppDescription.java | 53 -- .../engine/spark/application/SparkApplication.java | 620 --------------------- .../kylin/engine/spark/application/SparkEntry.java | 27 - .../spark/source/NSparkCubingSourceInput.java | 77 --- .../engine/spark/source/NSparkDataSource.java | 79 --- .../spark/source/NSparkMetadataExplorer.java | 313 ----------- .../kylin/engine/spark/source/NSparkTable.java | 65 --- .../kylin/engine/spark/source/NSparkTableMeta.java | 111 ---- .../spark/source/NSparkTableMetaBuilder.java | 140 ----- .../spark/source/NSparkTableMetaExplorer.java | 203 ------- .../engine/spark/source/NSparkTableReader.java | 87 --- .../kylin/engine/spark/source/SparkSqlUtil.java | 66 --- 16 files changed, 1983 deletions(-) diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java deleted file mode 100644 index 0301d20c91..0000000000 --- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.kyligence.kap.engine.spark.job; - -import java.util.Map; - -import org.apache.spark.sql.SparkSession; - -public interface EnviromentAdaptor { - - - Boolean prepareEnviroment(SparkSession spark, Map<String, String> params); -} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java deleted file mode 100644 index 9e5e58f20f..0000000000 --- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.kyligence.kap.engine.spark.job; - -import java.util.Map; - -public interface IJobProgressReport { - - boolean updateSparkJobInfo(Map<String, String> params, String url, String json); - - boolean updateSparkJobExtraInfo(Map<String, String> params, String url, String project, String jobId, - Map<String, String> extraInfo); - - default boolean executeFinish(Map<String, String> params, String project, String jobId) { - return true; - } - - default void initArgsParams(Map<String, String> argsParams) { - } - -} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java deleted file mode 100644 index c167b52597..0000000000 --- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.kyligence.kap.engine.spark.job; - -import java.util.Map; -import java.util.Properties; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.exception.ExecuteException; - -public interface ISparkJobHandler { - - void killOrphanApplicationIfExists(String project, String jobStepId, KylinConfig config, - Map<String, String> sparkConf); - - void checkApplicationJar(KylinConfig config) throws ExecuteException; - - String createArgsFileOnRemoteFileSystem(KylinConfig config, String project, String jobId, - Map<String, String> params) throws ExecuteException; - - Object generateSparkCmd(KylinConfig config, SparkAppDescription desc); - - default void modifyDump(Properties props) { - } - - default void prepareEnviroment(String project, String jobStepId, Map<String, String> params) { - } - - Map<String, String> runSparkSubmit(Object cmd, String parentId) throws ExecuteException; - -} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java deleted file mode 100644 index e218d319c7..0000000000 --- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.kyligence.kap.engine.spark.job; - -public class ParamsConstants { - - private ParamsConstants() { - throw new IllegalStateException("Utility class"); - } - - public static final String TIME_OUT = "time_out"; - public static final String JOB_TMP_DIR = "job_tmp_dir"; -} diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java deleted file mode 100644 index 8f105a5172..0000000000 --- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.kyligence.kap.engine.spark.job; - -import java.util.Map; -import java.util.Set; - -import lombok.Data; - -@Data -public class SparkAppDescription { - - private String hadoopConfDir; - - private String kylinJobJar; - - private String appArgs; - - private String jobNamePrefix; - - private String project; - - private String jobId; - - private int stepId; - - private String sparkSubmitClassName; - - private Map<String, String> sparkConf; - - private Set<String> sparkJars; - - private Set<String> sparkFiles; - - private String comma; - -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java deleted file mode 100644 index cac224d44b..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ /dev/null @@ -1,620 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.application; - -import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE; -import static org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.InetAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.cluster.IClusterManager; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.metadata.MetadataStore; -import org.apache.kylin.common.util.Application; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.TimeZoneUtils; -import org.apache.kylin.common.util.Unsafe; -import org.apache.kylin.engine.spark.job.BuildJobInfos; -import org.apache.kylin.engine.spark.job.KylinBuildEnv; -import org.apache.kylin.engine.spark.job.LogJobInfoUtils; -import org.apache.kylin.engine.spark.job.NSparkCubingUtil; -import org.apache.kylin.engine.spark.job.ResourceDetect; -import org.apache.kylin.engine.spark.job.RestfulJobProgressReport; -import org.apache.kylin.engine.spark.job.SegmentBuildJob; -import org.apache.kylin.engine.spark.job.SparkJobConstants; -import org.apache.kylin.engine.spark.job.UdfManager; -import org.apache.kylin.engine.spark.scheduler.ClusterMonitor; -import org.apache.kylin.engine.spark.utils.JobMetricsUtils; -import org.apache.kylin.engine.spark.utils.SparkConfHelper; -import org.apache.kylin.metadata.cube.model.NBatchConstants; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.NDataModelManager; -import org.apache.kylin.metadata.model.NTableMetadataManager; -import org.apache.kylin.metadata.model.PartitionDesc; -import org.apache.kylin.query.pushdown.SparkSubmitter; -import org.apache.kylin.query.util.PushDownUtil; -import org.apache.spark.SparkConf; -import org.apache.spark.application.NoRetryException; -import org.apache.spark.launcher.SparkLauncher; -import org.apache.spark.sql.KylinSession; -import org.apache.spark.sql.KylinSession$; -import org.apache.spark.sql.SparderEnv; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.SparkSessionExtensions; -import org.apache.spark.sql.catalyst.catalog.CatalogTableType; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.catalyst.rules.Rule; -import org.apache.spark.sql.execution.datasource.AlignmentTableStats; -import org.apache.spark.sql.hive.utils.ResourceDetectUtils; -import org.apache.spark.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; - -import io.kyligence.kap.engine.spark.job.EnviromentAdaptor; -import io.kyligence.kap.engine.spark.job.IJobProgressReport; -import io.kyligence.kap.engine.spark.job.ParamsConstants; -import lombok.val; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -public abstract class SparkApplication implements Application { - private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class); - private Map<String, String> params = Maps.newHashMap(); - public static final String JOB_NAME_PREFIX = "job_step_"; - private IJobProgressReport report; - - protected volatile KylinConfig config; - protected volatile String jobId; - protected String project; - protected int layoutSize = -1; - protected BuildJobInfos infos; - /** - * path for spark app args on HDFS - */ - protected String path; - - private ClusterMonitor clusterMonitor; - private final AtomicLong atomicDisconnectSparkMasterTimes = new AtomicLong(0); - private final AtomicBoolean atomicUnreachableSparkMaster = new AtomicBoolean(false); - private final AtomicReference<SparkConf> atomicSparkConf = new AtomicReference<>(null); - private final AtomicReference<SparkSession> atomicSparkSession = new AtomicReference<>(null); - private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new AtomicReference<>(null); - - public void execute(String[] args) { - try { - path = args[0]; - String argsLine = readArgsFromHDFS(); - params = JsonUtil.readValueAsMap(argsLine); - logger.info("Execute {} with args : {}", this.getClass().getName(), argsLine); - execute(); - } catch (Exception e) { - throw new RuntimeException("Error execute " + this.getClass().getName(), e); - } - } - - public AtomicBoolean getAtomicUnreachableSparkMaster() { - return atomicUnreachableSparkMaster; - } - - public final Map<String, String> getParams() { - return this.params; - } - - public final String getParam(String key) { - return this.params.get(key); - } - - public final void setParam(String key, String value) { - this.params.put(key, value); - } - - public final boolean contains(String key) { - return params.containsKey(key); - } - - public String getJobId() { - return jobId; - } - - public String getProject() { - return project; - } - - public KylinConfig getConfig() { - return config; - } - - public IJobProgressReport getReport() { - if (report == null) - return new RestfulJobProgressReport(); - return report; - } - - /// backwards compatibility, must have been initialized before invoking #doExecute. - protected SparkSession ss; - - public SparkSession getSparkSession() throws NoRetryException { - SparkSession sparkSession = atomicSparkSession.get(); - if (Objects.isNull(sparkSession)) { - // shouldn't reach here - throw new NoRetryException("spark session shouldn't be null"); - } - return sparkSession; - } - - public String readArgsFromHDFS() { - val fs = HadoopUtil.getFileSystem(path); - String argsLine = null; - Path filePath = new Path(path); - try (FSDataInputStream inputStream = fs.open(filePath)) { - BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset())); - argsLine = br.readLine(); - } catch (IOException e) { - logger.error("Error occurred when reading args file: {}", path, e); - } - return argsLine; - } - - /** - * get tracking url by application id - * - * @param sparkSession build sparkSession - * @return - */ - public String getTrackingUrl(IClusterManager clusterManager, SparkSession sparkSession) { - return clusterManager.getBuildTrackingUrl(sparkSession); - } - - private String tryReplaceHostAddress(String url) { - String originHost = null; - try { - URI uri = URI.create(url); - originHost = uri.getHost(); - String hostAddress = InetAddress.getByName(originHost).getHostAddress(); - return url.replace(originHost, hostAddress); - } catch (UnknownHostException uhe) { - logger.error("failed to get the ip address of {}, step back to use the origin tracking url.", originHost, - uhe); - return url; - } - } - - private Map<String, String> getTrackingInfo(SparkSession sparkSession, boolean ipAddressPreferred) { - IClusterManager clusterManager = atomicBuildEnv.get().clusterManager(); - String applicationId = sparkSession.sparkContext().applicationId(); - Map<String, String> extraInfo = new HashMap<>(); - extraInfo.put("yarn_app_id", applicationId); - try { - String trackingUrl = getTrackingUrl(clusterManager, sparkSession); - if (StringUtils.isBlank(trackingUrl)) { - logger.warn("Get tracking url of application {}, but empty url found.", applicationId); - return extraInfo; - } - if (ipAddressPreferred) { - trackingUrl = tryReplaceHostAddress(trackingUrl); - } - extraInfo.put("yarn_app_url", trackingUrl); - } catch (Exception e) { - logger.error("get tracking url failed!", e); - } - return extraInfo; - } - - protected void exchangeSparkSession() { - exchangeSparkSession(atomicSparkConf.get()); - } - - protected final void execute() throws Exception { - String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL); - jobId = getParam(NBatchConstants.P_JOB_ID); - project = getParam(NBatchConstants.P_PROJECT_NAME); - if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) { - layoutSize = StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length; - } - try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig - .setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl))) { - config = autoCloseConfig.get(); - report = (IJobProgressReport) ClassUtil.newInstance(config.getBuildJobProgressReporter()); - report.initArgsParams(getParams()); - //// KylinBuildEnv - final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config); - atomicBuildEnv.set(buildEnv); - infos = buildEnv.buildJobInfos(); - infos.recordJobId(jobId); - infos.recordProject(project); - infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId)); - - monitorSparkMaster(); - - HadoopUtil.setCurrentConfiguration(new Configuration()); - //////// - exchangeSparkConf(buildEnv.sparkConf()); - - TimeZoneUtils.setDefaultTimeZone(config); - - /// wait until resource is enough - waiteForResource(atomicSparkConf.get(), buildEnv); - - /// - logger.info("Prepare job environment"); - prepareSparkSession(); - - /// backwards compatibility - ss = getSparkSession(); - val master = ss.conf().get(SparkLauncher.SPARK_MASTER, ""); - if (!master.equals("local")) { - EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil - .newInstance(config.getBuildJobEnviromentAdaptor()); - adaptor.prepareEnviroment(ss, params); - } - - if (config.useDynamicS3RoleCredentialInTable()) { - val tableMetadataManager = NTableMetadataManager.getInstance(config, project); - tableMetadataManager.listAllTables().forEach(tableDesc -> SparderEnv - .addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc), ss)); - } - - if (!config.isUTEnv()) { - Unsafe.setProperty("kylin.env", config.getDeployEnv()); - } - logger.info("Start job"); - infos.startJob(); - // should be invoked after method prepareSparkSession - extraInit(); - - waiteForResourceSuccess(); - doExecute(); - // Output metadata to another folder - val resourceStore = ResourceStore.getKylinMetaStore(config); - val outputConfig = KylinConfig.createKylinConfig(config); - outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL)); - MetadataStore.createMetadataStore(outputConfig).dump(resourceStore); - } catch (Exception e) { - handleException(e); - } finally { - if (infos != null) { - infos.jobEnd(); - } - destroySparkSession(); - extraDestroy(); - executeFinish(); - } - } - - protected void handleException(Exception e) throws Exception { - throw e; - } - - private SparkSession createSpark(SparkConf sparkConf) { - SparkSession.Builder sessionBuilder = SparkSession.builder() - .withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { - @Override - public BoxedUnit apply(SparkSessionExtensions v1) { - v1.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() { - @Override - public Rule<LogicalPlan> apply(SparkSession session) { - return new AlignmentTableStats(session); - } - }); - return BoxedUnit.UNIT; - } - }).enableHiveSupport().config(sparkConf) - .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"); - - // If this is UT and SparkSession is already created, then use SparkSession. - // Otherwise, we always use KylinSession - boolean createWithSparkSession = !isJobOnCluster(sparkConf) && SparderEnv.isSparkAvailable(); - if (createWithSparkSession) { - boolean isKylinSession = SparderEnv.getSparkSession() instanceof KylinSession; - createWithSparkSession = !isKylinSession; - } - - if (createWithSparkSession) { - return sessionBuilder.getOrCreate(); - } else { - return KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession(); - } - } - - public boolean isJobOnCluster(SparkConf conf) { - return !Utils.isLocalMaster(conf) && !config.isUTEnv(); - } - - protected void extraInit() { - } - - public void extraDestroy() { - if (clusterMonitor != null) { - clusterMonitor.shutdown(); - } - } - - protected abstract void doExecute() throws Exception; - - protected void onLayoutFinished(long layoutId) { - //do nothing - } - - protected void onExecuteFinished() { - //do nothing - } - - protected String calculateRequiredCores() throws Exception { - return SparkJobConstants.DEFAULT_REQUIRED_CORES; - } - - private void autoSetSparkConf(SparkConf sparkConf) throws Exception { - SparkConfHelper helper = new SparkConfHelper(); - // copy user defined spark conf - if (sparkConf.getAll() != null) { - Arrays.stream(sparkConf.getAll()).forEach(config -> helper.setConf(config._1, config._2)); - } - helper.setClusterManager(KylinBuildEnv.get().clusterManager()); - - chooseContentSize(helper); - - helper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(layoutSize)); - helper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores()); - helper.setConf(COUNT_DISTICT, hasCountDistinct().toString()); - helper.generateSparkConf(); - helper.applySparkConf(sparkConf); - } - - private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) throws Exception { - val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null); - infos.recordStageId(waiteForResource.getId()); - waiteForResource.execute(); - } - - protected void waiteForResourceSuccess() throws Exception { - val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null); - waiteForResource.onStageFinished(true); - infos.recordStageId(""); - } - - protected void executeFinish() { - try { - getReport().executeFinish(getReportParams(), project, getJobId()); - } catch (Exception e) { - logger.error("executeFinish failed", e); - } - } - - protected void chooseContentSize(SparkConfHelper helper) { - Path shareDir = config.getJobTmpShareDir(project, jobId); - // add content size with unit - helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, chooseContentSize(shareDir)); - } - - protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) { - return modelDesc.getAllTableRefs().stream().anyMatch(p -> p.getTableDesc().isRangePartition()); - } - - protected String chooseContentSize(Path shareDir) { - // return size with unit - return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b"; - } - - protected Boolean hasCountDistinct() throws IOException { - Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId), - ResourceDetectUtils.countDistinctSuffix()); - FileSystem fileSystem = countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration()); - Boolean exist; - if (fileSystem.exists(countDistinct)) { - exist = ResourceDetectUtils.readResourcePathsAs(countDistinct); - } else { - exist = false; - logger.debug("File count_distinct.json doesn't exist, set hasCountDistinct to false."); - } - logger.debug("Exist count distinct measure: {}", exist); - return exist; - } - - public void logJobInfo() { - try { - logger.info(generateInfo()); - if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) { - logger.info("skip record job wait and run time"); - return; - } - Map<String, String> extraInfo = new HashMap<>(); - extraInfo.put("yarn_job_wait_time", ((Long) KylinBuildEnv.get().buildJobInfos().waitTime()).toString()); - extraInfo.put("yarn_job_run_time", ((Long) KylinBuildEnv.get().buildJobInfos().buildTime()).toString()); - - getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/wait_and_run_time", project, jobId, - extraInfo); - } catch (Exception e) { - logger.warn("Error occurred when generate job info.", e); - } - } - - private Map<String, String> getReportParams() { - val reportParams = new HashMap<String, String>(); - reportParams.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout())); - reportParams.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)); - return reportParams; - } - - protected String generateInfo() { - return LogJobInfoUtils.sparkApplicationInfo(); - } - - public Set<String> getIgnoredSnapshotTables() { - return NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES)); - } - - protected Map<String, String> getSparkConfigOverride(KylinConfig config) { - return config.getSparkConfigOverride(); - } - - protected void checkDateFormatIfExist(String project, String modelId) throws Exception { - if (config.isUTEnv()) { - return; - } - val modelManager = NDataModelManager.getInstance(config, project); - NDataModel modelDesc = modelManager.getDataModelDesc(modelId); - if (checkRangePartitionTableIsExist(modelDesc)) { - logger.info("Range partitioned tables do not support pushdown, so do not need to perform subsequent logic"); - return; - } - - val partitionDesc = modelDesc.getPartitionDesc(); - if (PartitionDesc.isEmptyPartitionDesc(partitionDesc) - || org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat())) - return; - - if (CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType())) - return; - - String partitionColumn = modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB(); - - SparkSession sparkSession = atomicSparkSession.get(); - try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance() - .overrideSparkSession(sparkSession)) { - String dateString = PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(), partitionColumn, - project); - val sdf = new SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(), - Locale.getDefault(Locale.Category.FORMAT)); - val date = sdf.parse(dateString); - if (date == null || !dateString.equals(sdf.format(date))) { - throw new NoRetryException("date format not match"); - } - } catch (KylinException ignore) { - // ignore it when pushdown return empty row - } catch (ParseException | NoRetryException e) { - throw new NoRetryException("date format not match"); - } - } - - private void exchangeSparkConf(SparkConf sparkConf) throws Exception { - if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) { - Map<String, String> baseSparkConf = getSparkConfigOverride(config); - if (!baseSparkConf.isEmpty()) { - baseSparkConf.forEach(sparkConf::set); - String baseSparkConfStr = JsonUtil.writeValueAsString(baseSparkConf); - logger.info("Override user-defined spark conf: {}", baseSparkConfStr); - } - if (config.isAutoSetSparkConf()) { - logger.info("Set spark conf automatically."); - try { - autoSetSparkConf(sparkConf); - } catch (Exception e) { - logger.warn("Auto set spark conf failed. Load spark conf from system properties", e); - } - } - } - - atomicSparkConf.set(sparkConf); - } - - private void exchangeSparkSession(SparkConf sparkConf) { - SparkSession sparkSession = atomicSparkSession.get(); - if (Objects.nonNull(sparkSession)) { - // destroy previous spark session - destroySparkSession(); - } - - sparkSession = createSpark(sparkConf); - if (!config.isUTEnv() && !sparkConf.get("spark.master").startsWith("k8s")) { - getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/spark", project, jobId, - getTrackingInfo(sparkSession, config.isTrackingUrlIpAddressEnabled())); - } - - // for spark metrics - JobMetricsUtils.registerListener(sparkSession); - SparderEnv.registerListener(sparkSession.sparkContext()); - - //#8341 - SparderEnv.setSparkSession(sparkSession); - UdfManager.create(sparkSession); - - /// - atomicSparkSession.set(sparkSession); - } - - private void prepareSparkSession() throws NoRetryException { - SparkConf sparkConf = atomicSparkConf.get(); - if (Objects.isNull(sparkConf)) { - // shouldn't reach here - throw new NoRetryException("spark conf shouldn't be null"); - } - - /// SegmentBuildJob only!!! - if (config.isSnapshotSpecifiedSparkConf() && (this instanceof SegmentBuildJob)) { - // snapshot specified spark conf, based on the exchanged spark conf. - SparkConf clonedSparkConf = sparkConf.clone(); - Map<String, String> snapshotSparkConf = config.getSnapshotBuildingConfigOverride(); - snapshotSparkConf.forEach(clonedSparkConf::set); - logger.info("exchange sparkSession using snapshot specified sparkConf"); - exchangeSparkSession(clonedSparkConf); - return; - } - // normal logic - exchangeSparkSession(sparkConf); - } - - private void destroySparkSession() { - SparkSession sparkSession = atomicSparkSession.get(); - if (Objects.isNull(sparkSession)) { - logger.info("no initialized sparkSession instance"); - return; - } - if (sparkSession.conf().get("spark.master").startsWith("local")) { - // for UT use? but very strange for resource detect mode (spark local). - return; - } - JobMetricsUtils.unRegisterListener(sparkSession); - sparkSession.stop(); - } - - private void monitorSparkMaster() { - clusterMonitor = new ClusterMonitor(); - clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession, atomicDisconnectSparkMasterTimes, - atomicUnreachableSparkMaster); - } - -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java deleted file mode 100644 index 31974f65bc..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.application; - -import org.apache.spark.application.JobWorkSpace; - -public class SparkEntry { - public static void main(String[] args) { - JobWorkSpace.execute(args); - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java deleted file mode 100644 index 5411c31bbd..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable; - -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.engine.spark.NSparkCubingEngine; -import org.apache.kylin.engine.spark.job.KylinBuildEnv; -import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.SparderTypeUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - -public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingSource { - private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class); - - @Override - public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) { - ColumnDesc[] columnDescs = table.getColumns(); - List<String> tblColNames = Lists.newArrayListWithCapacity(columnDescs.length); - StructType kylinSchema = new StructType(); - for (ColumnDesc columnDesc : columnDescs) { - if (!columnDesc.isComputedColumn()) { - kylinSchema = kylinSchema.add(columnDesc.getName(), - SparderTypeUtil.toSparkType(columnDesc.getType(), false), true); - tblColNames.add("`" + columnDesc.getName() + "`"); - } - } - String[] colNames = tblColNames.toArray(new String[0]); - String colString = Joiner.on(",").join(colNames); - String sql; - KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig(); - logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}", - table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled()); - if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(), - kylinConfig.isReadTransactionalTableEnabled())) { - sql = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString, - KylinBuildEnv.get()); - } else { - sql = String.format(Locale.ROOT, "select %s from %s", colString, table.getIdentity()); - } - Dataset<Row> df = ss.sql(sql); - StructType sparkSchema = df.schema(); - logger.debug("Source data sql is: {}", sql); - logger.debug("Kylin schema: {}", kylinSchema.treeString()); - return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema)); - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java deleted file mode 100644 index 6a501bc71e..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.source; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.SegmentRange; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.IReadableTable; -import org.apache.kylin.source.ISampleDataDeployer; -import org.apache.kylin.source.ISource; -import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.engine.spark.NSparkCubingEngine; - -public class NSparkDataSource implements ISource { - // for reflection - public NSparkDataSource(KylinConfig config) { - - } - - @Override - public ISourceMetadataExplorer getSourceMetadataExplorer() { - return new NSparkMetadataExplorer(); - } - - @Override - public <I> I adaptToBuildEngine(Class<I> engineInterface) { - if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) { - return (I) new NSparkCubingSourceInput(); - } else { - throw new IllegalArgumentException("Unsupported engine interface: " + engineInterface); - } - } - - @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { - return new NSparkTable(tableDesc); - } - - @Override - public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable, SegmentRange srcPartition) { - return srcPartition; - } - - @Override - public ISampleDataDeployer getSampleDataDeployer() { - return new NSparkMetadataExplorer(); - } - - @Override - public SegmentRange getSegmentRange(String start, String end) { - start = StringUtils.isEmpty(start) ? "0" : start; - end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end; - return new SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start), Long.parseLong(end)); - } - - @Override - public boolean supportBuildSnapShotByPartition() { - return true; - } - -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java deleted file mode 100644 index 7e4f7deea0..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.kylin.common.KapConfig; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.RandomUtil; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.ISourceAware; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; -import org.apache.kylin.source.ISampleDataDeployer; -import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.metadata.model.NTableMetadataManager; -import org.apache.spark.sql.AnalysisException; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparderEnv; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalog.Database; -import org.apache.spark.sql.catalyst.catalog.CatalogTableType; -import org.apache.spark.sql.internal.SQLConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.util.Lists; -import com.google.common.collect.Sets; - -import lombok.val; - -public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer, Serializable { - - private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class); - - public static String generateCreateSchemaSql(String schemaName) { - return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName); - } - - public static String[] generateCreateTableSql(TableDesc tableDesc) { - String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); - - StringBuilder ddl = new StringBuilder(); - ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); - ddl.append("(" + "\n"); - - for (int i = 0; i < tableDesc.getColumns().length; i++) { - ColumnDesc col = tableDesc.getColumns()[i]; - if (i > 0) { - ddl.append(","); - } - ddl.append(col.getName() + " " + col.getDatatype() + "\n"); - } - - ddl.append(")" + "\n"); - ddl.append("USING com.databricks.spark.csv"); - - return new String[] { dropSql, ddl.toString() }; - } - - public NSparkTableMetaExplorer getTableMetaExplorer() { - return new NSparkTableMetaExplorer(); - } - - @Override - public List<String> listDatabases() throws Exception { - Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace"); - return dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList()); - } - - @Override - public List<String> listTables(String database) throws Exception { - val ugi = UserGroupInformation.getCurrentUser(); - val config = KylinConfig.getInstanceFromEnv(); - val spark = SparderEnv.getSparkSession(); - - List<String> tables = Lists.newArrayList(); - try { - String sql = "show tables"; - if (StringUtils.isNotBlank(database)) { - sql = String.format(Locale.ROOT, sql + " in %s", database); - } - Dataset<Row> dataset = SparderEnv.getSparkSession().sql(sql).select("tableName"); - tables = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList()); - - if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable() - && UserGroupInformation.isSecurityEnabled()) { - List<String> accessTables = Lists.newArrayList(); - for (String table : tables) { - val tableName = database + "." + table; - if (checkTableAccess(tableName)) { - accessTables.add(table); - } - } - return accessTables; - } - } catch (Exception e) { - logger.error("List hive tables failed. user: {}, db: {}", ugi.getUserName(), database); - } - - return tables; - } - - public boolean checkTableAccess(String tableName) { - boolean isAccess = true; - try { - val spark = SparderEnv.getSparkSession(); - val sparkTable = spark.catalog().getTable(tableName); - Set<String> needCheckTables = Sets.newHashSet(); - if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) { - needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, SparderEnv.getSparkSession()); - } else { - needCheckTables.add(tableName); - } - String hiveSpecFsLocation = spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION()); - FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem() - : HadoopUtil.getFileSystem(hiveSpecFsLocation); - for (String table : needCheckTables) { - fs.listStatus(new Path(getLoc(spark, table, hiveSpecFsLocation))); - } - } catch (Exception e) { - isAccess = false; - try { - logger.error("Read hive table {} error:{}, ugi name: {}.", tableName, e.getMessage(), - UserGroupInformation.getCurrentUser().getUserName()); - } catch (IOException ex) { - logger.error("fetch user curr ugi info error.", e); - } - } - return isAccess; - } - - @Override - public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj) - throws Exception { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - NTableMetadataManager metaMgr = NTableMetadataManager.getInstance(config, prj); - - NSparkTableMeta tableMeta = getTableMetaExplorer().getSparkTableMeta(database, tableName); - TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName); - - // make a new TableDesc instance, don't modify the one in use - if (tableDesc == null) { - tableDesc = new TableDesc(); - tableDesc.setDatabase(database.toUpperCase(Locale.ROOT)); - tableDesc.setName(tableName.toUpperCase(Locale.ROOT)); - tableDesc.setUuid(RandomUtil.randomUUIDStr()); - tableDesc.setLastModified(0); - } else { - tableDesc = new TableDesc(tableDesc); - } - - if (tableMeta.tableType != null) { - tableDesc.setTableType(tableMeta.tableType); - } - //set table type = spark - tableDesc.setSourceType(ISourceAware.ID_SPARK); - tableDesc.setTransactional(tableMeta.isTransactional); - tableDesc.setRangePartition(tableMeta.isRangePartition); - - Set<String> partColumnSet = Optional.ofNullable(tableMeta.partitionColumns) // - .orElseGet(Collections::emptyList).stream().map(field -> field.name) // - .collect(Collectors.toSet()); - int columnNumber = tableMeta.allColumns.size(); - List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber); - for (int i = 0; i < columnNumber; i++) { - NSparkTableMeta.SparkTableColumnMeta field = tableMeta.allColumns.get(i); - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(field.name.toUpperCase(Locale.ROOT)); - cdesc.setCaseSensitiveName(field.name); - // use "double" in kylin for "float" - if ("float".equalsIgnoreCase(field.dataType)) { - cdesc.setDatatype("double"); - } else { - cdesc.setDatatype(field.dataType); - } - cdesc.setId(String.valueOf(i + 1)); - cdesc.setComment(field.comment); - cdesc.setPartitioned(partColumnSet.contains(field.name)); - columns.add(cdesc); - } - tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); - List<String> partCols = tableMeta.partitionColumns.stream().map(col -> col.name).collect(Collectors.toList()); - if (!partCols.isEmpty()) { - tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT)); - } else { - tableDesc.setPartitionColumn(null); - } - StringBuilder partitionColumnBuilder = new StringBuilder(); - for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) { - if (i > 0) - partitionColumnBuilder.append(", "); - partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT)); - } - - TableExtDesc tableExtDesc = new TableExtDesc(); - tableExtDesc.setIdentity(tableDesc.getIdentity()); - tableExtDesc.setUuid(RandomUtil.randomUUIDStr()); - tableExtDesc.setLastModified(0); - tableExtDesc.init(prj); - - tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY, tableMeta.sdLocation); - tableExtDesc.addDataSourceProp("owner", tableMeta.owner); - tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime); - tableExtDesc.addDataSourceProp("last_access_time", tableMeta.lastAccessTime); - tableExtDesc.addDataSourceProp("partition_column", partitionColumnBuilder.toString()); - tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(tableMeta.fileSize)); - tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(tableMeta.fileNum)); - tableExtDesc.addDataSourceProp("hive_inputFormat", tableMeta.sdInputFormat); - tableExtDesc.addDataSourceProp("hive_outputFormat", tableMeta.sdOutputFormat); - tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY, tableMeta.s3Role); - tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY, tableMeta.s3Endpoint); - return Pair.newPair(tableDesc, tableExtDesc); - } - - @Override - public List<String> getRelatedKylinResources(TableDesc table) { - return Collections.emptyList(); - } - - @Override - public boolean checkDatabaseAccess(String database) throws Exception { - boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable(); - if (hiveDBAccessFilterEnable) { - logger.info("Check database {} access start.", database); - try { - Database db = SparderEnv.getSparkSession().catalog().getDatabase(database); - } catch (AnalysisException e) { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - logger.info("The current user: {} does not have permission to access database {}", ugi.getUserName(), - database); - return false; - } - } - - return true; - } - - @Override - public boolean checkTablesAccess(Set<String> tables) { - return tables.stream().allMatch(this::checkTableAccess); - } - - @Override - public Set<String> getTablePartitions(String database, String table, String prj, String partCol) { - return getTableMetaExplorer().checkAndGetTablePartitions(database, table, partCol); - } - - @Override - public void createSampleDatabase(String database) throws Exception { - SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database)); - } - - @Override - public void createSampleTable(TableDesc table) throws Exception { - String[] createTableSqls = generateCreateTableSql(table); - for (String sql : createTableSqls) { - SparderEnv.getSparkSession().sql(sql); - } - } - - @Override - public void loadSampleData(String tableName, String tableFileDir) throws Exception { - Dataset<Row> dataset = SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + ".csv").toDF(); - if (tableName.indexOf(".") > 0) { - tableName = tableName.substring(tableName.indexOf(".") + 1); - } - dataset.createOrReplaceTempView(tableName); - } - - @Override - public void createWrapperView(String origTableName, String viewName) throws Exception { - throw new UnsupportedOperationException("unsupport create wrapper view"); - } - - public String getLoc(SparkSession spark, String table, String hiveSpecFsLocation) { - String loc = spark.sql("desc formatted " + table).where("col_name == 'Location'").head().getString(1); - if (null == hiveSpecFsLocation || null == loc) { - return loc; - } - return loc.replace("hdfs://hacluster", hiveSpecFsLocation); - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java deleted file mode 100644 index 6ab6643154..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.source; - -import java.io.IOException; -import java.util.Locale; - -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.IReadableTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NSparkTable implements IReadableTable { - - private static final Logger logger = LoggerFactory.getLogger(NSparkTable.class); - - final private String database; - final private String tableName; - - public NSparkTable(TableDesc tableDesc) { - this.database = tableDesc.getDatabase(); - this.tableName = tableDesc.getName(); - } - - @Override - public TableReader getReader() throws IOException { - return new NSparkTableReader(database, tableName); - } - - @Override - public TableSignature getSignature() throws IOException { - // TODO: 07/12/2017 get modify time - String path = String.format(Locale.ROOT, "%s.%s", database, tableName); - long lastModified = System.currentTimeMillis(); // assume table is ever changing - int size = 0; - return new TableSignature(path, size, lastModified); - } - - @Override - public boolean exists() { - return true; - } - - @Override - public String toString() { - return "spark: database=[" + database + "], table=[" + tableName + "]"; - } - -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java deleted file mode 100644 index 29f48bfbfc..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import java.util.List; - -public class NSparkTableMeta { - public static class SparkTableColumnMeta { - String name; - String dataType; - String comment; - - public SparkTableColumnMeta(String name, String dataType, String comment) { - this.name = name; - this.dataType = dataType; - this.comment = comment; - } - - @Override - public String toString() { - return "SparkTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='" - + comment + '\'' + '}'; - } - - public String getName() { - return name; - } - - public String getDataType() { - return dataType; - } - - public String getComment() { - return comment; - } - } - - String tableName; - String sdLocation;//sd is short for storage descriptor - String sdInputFormat; - String sdOutputFormat; - String owner; - String provider; - String tableType; - String createTime; - String lastAccessTime; - long fileSize; - long fileNum; - boolean isNative; - List<SparkTableColumnMeta> allColumns; - List<SparkTableColumnMeta> partitionColumns; - boolean isTransactional; - boolean isRangePartition; - String s3Role; - String s3Endpoint; - - public List<SparkTableColumnMeta> getAllColumns() { - return allColumns; - } - - public NSparkTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, - String owner, String provider, String tableType, String createTime, String lastAccessTime, long fileSize, - long fileNum, boolean isNative, List<SparkTableColumnMeta> allColumns, - List<SparkTableColumnMeta> partitionColumns, boolean isTransactional, boolean isRangePartition, - String s3Role, String s3Endpoint) { - this.tableName = tableName; - this.sdLocation = sdLocation; - this.sdInputFormat = sdInputFormat; - this.sdOutputFormat = sdOutputFormat; - this.owner = owner; - this.provider = provider; - this.tableType = tableType; - this.createTime = createTime; - this.lastAccessTime = lastAccessTime; - this.fileSize = fileSize; - this.fileNum = fileNum; - this.isNative = isNative; - this.allColumns = allColumns; - this.partitionColumns = partitionColumns; - this.isTransactional = isTransactional; - this.isRangePartition = isRangePartition; - this.s3Role = s3Role; - this.s3Endpoint = s3Endpoint; - } - - @Override - public String toString() { - return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' - + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' - + ", owner='" + owner + ", provider='" + provider + '\'' + ", tableType='" + tableType - + ", createTime='" + createTime + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize - + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns - + ", partitionColumns=" + partitionColumns + ", isTransactional=" + isTransactional - + ", isRangePartition=" + isRangePartition + ", s3Role=" + s3Role + ", s3Endpoint=" + s3Endpoint + '}'; - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java deleted file mode 100644 index 2d4a9eeae1..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.source; - -import java.util.List; - -import com.google.common.collect.Lists; - -public class NSparkTableMetaBuilder { - private String tableName; - private String sdLocation; - private String sdInputFormat; - private String sdOutputFormat; - private String owner; - private String provider; - private String tableType; - private String createTime; - private String lastAccessTime; - private long fileSize; - private long fileNum; - private boolean isNative = true; - private List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayList(); - private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns = Lists.newArrayList(); - private boolean isTransactional = false; - private boolean isRangePartition = false; - private String s3Role; - private String s3Endpoint; - - public NSparkTableMetaBuilder setTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public NSparkTableMetaBuilder setSdLocation(String sdLocation) { - this.sdLocation = sdLocation; - return this; - } - - public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) { - this.sdInputFormat = sdInputFormat; - return this; - } - - public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) { - this.sdOutputFormat = sdOutputFormat; - return this; - } - - public NSparkTableMetaBuilder setOwner(String owner) { - this.owner = owner; - return this; - } - - public NSparkTableMetaBuilder setProvider(String provider) { - this.provider = provider; - return this; - } - - public NSparkTableMetaBuilder setTableType(String tableType) { - this.tableType = tableType; - return this; - } - - public NSparkTableMetaBuilder setCreateTime(String createTime) { - this.createTime = createTime; - return this; - } - - public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) { - this.lastAccessTime = lastAccessTime; - return this; - } - - public NSparkTableMetaBuilder setFileSize(long fileSize) { - this.fileSize = fileSize; - return this; - } - - public NSparkTableMetaBuilder setFileNum(long fileNum) { - this.fileNum = fileNum; - return this; - } - - public NSparkTableMetaBuilder setIsNative(boolean isNative) { - this.isNative = isNative; - return this; - } - - public NSparkTableMetaBuilder setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) { - this.allColumns = allColumns; - return this; - } - - public NSparkTableMetaBuilder setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns) { - this.partitionColumns = partitionColumns; - return this; - } - - public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) { - this.isTransactional = isTransactional; - return this; - } - - public NSparkTableMetaBuilder setIsRangePartition(boolean isRangePartition) { - this.isRangePartition = isRangePartition; - return this; - } - - public NSparkTableMetaBuilder setS3Role(String s3Role) { - this.s3Role = s3Role; - return this; - } - - public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) { - this.s3Endpoint = s3Endpoint; - return this; - } - - public NSparkTableMeta createSparkTableMeta() { - return new NSparkTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, provider, tableType, - createTime, lastAccessTime, fileSize, fileNum, isNative, allColumns, partitionColumns, isTransactional, - isRangePartition, s3Role, s3Endpoint); - } -} \ No newline at end of file diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java deleted file mode 100644 index 033edb84f9..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import java.io.Serializable; -import java.net.URI; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.commons.jnet.Installer; -import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; -import org.apache.spark.sql.SparderEnv; -import org.apache.spark.sql.catalyst.TableIdentifier; -import org.apache.spark.sql.catalyst.catalog.CatalogTable; -import org.apache.spark.sql.catalyst.catalog.CatalogTableType; -import org.apache.spark.sql.catalyst.catalog.SessionCatalog; -import org.apache.spark.sql.types.StructType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import scala.Option; -import scala.collection.JavaConversions; - -public class NSparkTableMetaExplorer implements Serializable { - - private static final Logger logger = LoggerFactory.getLogger(NSparkTableMetaExplorer.class); - - enum PROVIDER { - HIVE("hive"), UNSPECIFIED(""); - - private static final PROVIDER[] ALL = new PROVIDER[] { HIVE }; - private String value; - - PROVIDER(String value) { - this.value = value; - } - - public static PROVIDER fromString(Option<String> value) { - if (value.isEmpty()) { - return UNSPECIFIED; - } - - for (PROVIDER provider : ALL) { - if (provider.value.equals(value.get())) { - return provider; - } - } - return UNSPECIFIED; - } - } - - private static final List<String> UNSUPOORT_TYPE = Lists.newArrayList("array", "map", "struct", "binary"); - private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"; - public static final String S3_ROLE_PROPERTY_KEY = "role"; - public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint"; - - public NSparkTableMeta getSparkTableMeta(String database, String tableName) { - SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog(); - TableIdentifier tableIdentifier = TableIdentifier.apply(tableName, - Option.apply(database.isEmpty() ? null : database)); - CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); - checkTableIsValid(tableMetadata, tableIdentifier, tableName); - return getSparkTableMeta(tableName, tableMetadata); - } - - public Set<String> checkAndGetTablePartitions(String database, String tableName, String partitionCol) { - SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog(); - TableIdentifier tableIdentifier = TableIdentifier.apply(tableName, - Option.apply(database.isEmpty() ? null : database)); - - CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); - - String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ? null - : tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT); - - if (!partitionCol.equalsIgnoreCase(firstPartCol)) { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "table partition col %s not match col %s", firstPartCol, partitionCol)); - } - return JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier, Option.empty())).stream() - .map(item -> JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream() - .filter(entry -> partitionCol.equalsIgnoreCase(entry.getKey())) // - .findFirst() // - .map(Map.Entry::getValue) // - .orElse(null)) - .filter(Objects::nonNull).collect(Collectors.toSet()); - } - - private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable tableMetadata) { - NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder(); - builder.setTableName(tableName); - builder.setAllColumns(getColumns(tableMetadata, tableMetadata.schema())); - builder.setOwner(tableMetadata.owner()); - builder.setCreateTime(tableMetadata.createTime() + ""); - builder.setLastAccessTime(tableMetadata.lastAccessTime() + ""); - builder.setTableType(tableMetadata.tableType().name()); - builder.setPartitionColumns(getColumns(tableMetadata, tableMetadata.partitionSchema())); - builder.setIsRangePartition(isRangePartition(tableMetadata)); - if (tableMetadata.storage().inputFormat().isDefined()) { - builder.setSdInputFormat(tableMetadata.storage().inputFormat().get()); - } - if (tableMetadata.storage().outputFormat().isDefined()) { - builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get()); - } - Option<URI> uriOption = tableMetadata.storage().locationUri(); - if (uriOption.isDefined()) { - builder.setSdLocation(uriOption.get().toString()); - } - if (tableMetadata.provider().isDefined()) { - builder.setProvider(tableMetadata.provider().get()); - } - if (tableMetadata.properties().contains("totalSize")) { - builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get())); - } - if (tableMetadata.properties().contains("numFiles")) { - builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get())); - } - if (tableMetadata.properties().contains("transactional")) { - builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get())); - } - if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) { - builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get()); - } - - if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) { - builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get()); - } - return builder.createSparkTableMeta(); - } - - private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema) { - return getColumns(tableMetadata, schema, true); - } - - private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema, - boolean isCheckRepeatColumn) { - List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayListWithCapacity(schema.size()); - Set<String> columnCacheTemp = Sets.newHashSet(); - for (org.apache.spark.sql.types.StructField field : schema.fields()) { - String type = field.dataType().simpleString(); - if (field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) { - type = field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY); - } - String finalType = type; - if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) { - logger.info("Load table {} ignore column {}:{}", tableMetadata.identifier().identifier(), field.name(), - finalType); - continue; - } - if (isCheckRepeatColumn && columnCacheTemp.contains(field.name())) { - logger.info("The【{}】column is already included and does not need to be added again", field.name()); - continue; - } - columnCacheTemp.add(field.name()); - allColumns.add(new NSparkTableMeta.SparkTableColumnMeta(field.name(), type, - field.getComment().isDefined() ? field.getComment().get() : null)); - } - - return allColumns; - } - - private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier tableIdentifier, String tableName) { - if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) { - try { - Installer.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); - SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed(); - } catch (Throwable e) { - logger.error("Error for parser view: " + tableName, e); - throw new RuntimeException("Error for parser view: " + tableName + ", " + e.getMessage() - + "(There are maybe syntactic differences between HIVE and SparkSQL)", e); - } - } - } - - private Boolean isRangePartition(CatalogTable tableMetadata) { - List<NSparkTableMeta.SparkTableColumnMeta> allColumns = getColumns(tableMetadata, tableMetadata.schema(), - false); - return allColumns.stream().collect(Collectors.groupingBy(p -> p.name)).values().stream() - .anyMatch(p -> p.size() > 1); - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java deleted file mode 100644 index 669f418c9c..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; - -import org.apache.kylin.source.IReadableTable.TableReader; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparderEnv; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructField; - -public class NSparkTableReader implements TableReader { - private String dbName; - private String tableName; - private SparkSession ss; - private List<Row> records; - private Iterator<Row> iterator; - private Row currentRow; - - public NSparkTableReader(String dbName, String tableName) { - this.dbName = dbName; - this.tableName = tableName; - initialize(); - } - - public static String[] getRowAsStringArray(Row record) { - StructField[] fields = record.schema().fields(); - String[] arr = new String[fields.length]; - for (int i = 0; i < arr.length; i++) { - Object o = record.get(i); - arr[i] = (o == null) ? null : o.toString(); - } - return arr; - } - - private void initialize() { - ss = SparderEnv.getSparkSession(); - String master = ss.sparkContext().master(); - String tableIdentity = tableName; - // spark sql can not add the database prefix when create tempView from csv, but when working with hive, it need the database prefix - if (!master.toLowerCase(Locale.ROOT).contains("local")) { - tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName, tableName); - } - records = SparkSqlUtil.queryAll(ss, tableIdentity); - iterator = records.iterator(); - } - - @Override - public boolean next() throws IOException { - boolean hasNext = iterator != null && iterator.hasNext(); - if (hasNext) { - currentRow = iterator.next(); - } - return hasNext; - } - - @Override - public String[] getRow() { - return getRowAsStringArray(currentRow); - } - - @Override - public void close() throws IOException { - this.records = null; - this.iterator = null; - this.currentRow = null; - } -} diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java deleted file mode 100644 index cea86bc9fa..0000000000 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.engine.spark.source; - -import java.util.List; -import java.util.Locale; -import java.util.Set; - -import org.apache.spark.sql.AnalysisException; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; -import org.apache.spark.sql.catalyst.catalog.CatalogTableType; - -import com.google.common.collect.Sets; - -import lombok.val; - -public class SparkSqlUtil { - public static Dataset<Row> query(SparkSession ss, String sql) { - return ss.sql(sql); - } - - public static List<Row> queryForList(SparkSession ss, String sql) { - return ss.sql(sql).collectAsList(); - } - - public static List<Row> queryAll(SparkSession ss, String table) { - String sql = String.format(Locale.ROOT, "select * from %s", table); - return queryForList(ss, sql); - } - - public static Set<String> getViewOrignalTables(String viewName, SparkSession spark) throws AnalysisException { - String viewText = spark.sql("desc formatted " + viewName).where("col_name = 'View Text'").head().getString(1); - val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText); - Set<String> viewTables = Sets.newHashSet(); - for (Object l : scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava()) { - if (l instanceof UnresolvedRelation) { - val tableName = ((UnresolvedRelation) l).tableName(); - //if nested view - if (spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name())) { - viewTables.addAll(getViewOrignalTables(tableName, spark)); - } else { - viewTables.add(tableName); - } - } - } - return viewTables; - } -}