This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 662a7f9 KYLIN-3795 Submit Spark jobs via Apache Livy 662a7f9 is described below commit 662a7f95104b3e8c31026cef65192127a2ab2e15 Author: javalife0312 <javalife0...@126.com> AuthorDate: Mon Mar 4 10:58:29 2019 +0800 KYLIN-3795 Submit Spark jobs via Apache Livy --- .../org/apache/kylin/common/KylinConfigBase.java | 24 ++ .../apache/kylin/common/livy/LivyRestBuilder.java | 155 ++++++++++++ .../apache/kylin/common/livy/LivyRestClient.java | 136 ++++++++++ .../apache/kylin/common/livy/LivyRestExecutor.java | 122 +++++++++ .../apache/kylin/common/livy/LivyStateEnum.java | 25 ++ .../org/apache/kylin/common/livy/LivyTypeEnum.java | 25 ++ .../engine/spark/SparkBatchCubingJobBuilder2.java | 4 +- .../engine/spark/SparkBatchMergeJobBuilder2.java | 4 +- .../apache/kylin/engine/spark/SparkExecutable.java | 6 +- .../kylin/engine/spark/SparkExecutableFactory.java | 32 +++ .../kylin/engine/spark/SparkExecutableLivy.java | 276 +++++++++++++++++++++ .../kylin/engine/spark/SparkSqlOnLivyBatch.scala | 52 ++++ .../source/hive/CreateFlatHiveTableByLivyStep.java | 124 +++++++++ .../apache/kylin/source/hive/HiveInputBase.java | 46 +++- .../hive/RedistributeFlatHiveTableByLivyStep.java | 149 +++++++++++ .../kylin/storage/hbase/steps/HBaseSparkSteps.java | 3 +- 16 files changed, 1172 insertions(+), 11 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6457d0a..81979dc 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1372,7 +1372,31 @@ public abstract class KylinConfigBase implements Serializable { public boolean isSparkSanityCheckEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE)); } + + // ============================================================================ + // ENGINE.LIVY + // ============================================================================ + + public boolean enableLivy() { + return getOptional("kylin.engine.livy-conf.livy.enable", "false").equalsIgnoreCase("true") ? true : false; + } + + public String getLivyUrl() { + return getOptional("kylin.engine.livy-conf.livy.url"); + } + public Map<String, String> getLivyKey() { + return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key."); + } + + public Map<String, String> getLivyArr() { + return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr."); + } + + public Map<String, String> getLivyMap() { + return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map."); + } + // ============================================================================ // QUERY // ============================================================================ diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java new file mode 100644 index 0000000..218b4e4 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestBuilder.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.livy; + +import com.google.common.collect.Lists; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.SourceConfigurationUtil; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Map; + +/** + * + */ +public class LivyRestBuilder { + protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestBuilder.class); + + final private KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final private Map<String, String> hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration(); + + private String url; + private LivyTypeEnum livyTypeEnum; + + private Map<String, String> keys; + private Map<String, String> arrs; + private Map<String, String> maps; + + private ArrayList<String> args = new ArrayList<>(); + + public LivyRestBuilder() { + url = kylinConfig.getLivyUrl(); + + keys = kylinConfig.getLivyKey(); + arrs = kylinConfig.getLivyArr(); + maps = kylinConfig.getLivyMap(); + } + + public String build() throws JSONException { + try { + + JSONObject postJson = new JSONObject(); + + if (LivyTypeEnum.sql.equals(livyTypeEnum)) { + postJson.put("className", "org.apache.kylin.engine.spark.SparkSqlOnLivyBatch"); + postJson.put("args", args); + } else if (LivyTypeEnum.job.equals(livyTypeEnum)) { + postJson.put("className", "org.apache.kylin.common.util.SparkEntry"); + postJson.put("args", args); + } else { + throw new IllegalArgumentException("unSupport livy type."); + } + + //deal conf of key + keys.forEach((key, value) -> { + try { + postJson.put(key, value); + } catch (JSONException e) { + e.printStackTrace(); + } + }); + + //deal conf of arr + arrs.forEach((key, value) -> { + try { + postJson.put(key, Lists.newArrayList(value.split(","))); + } catch (JSONException e) { + e.printStackTrace(); + } + }); + + //deal conf of map + JSONObject confJson = new JSONObject(); + maps.forEach((key, value) -> { + try { + confJson.put(key, value); + } catch (JSONException e) { + e.printStackTrace(); + } + }); + postJson.put("conf", confJson); + + return postJson.toString(); + } catch (JSONException e) { + e.printStackTrace(); + throw new JSONException("create livy json error :" + e.getMessage()); + } + } + + public void overwriteHiveProps(Map<String, String> overwrites) { + this.hiveConfProps.putAll(overwrites); + } + + public String parseProps() { + StringBuilder s = new StringBuilder(); + for (Map.Entry<String, String> prop : hiveConfProps.entrySet()) { + s.append("set "); + s.append(prop.getKey()); + s.append("="); + s.append(prop.getValue()); + s.append("; \n"); + } + return s.toString(); + } + + public void addArgs(String arg) { + this.args.add(arg); + } + + public void addConf(String key, String value) { + this.maps.put(key, value); + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public ArrayList<String> getArgs() { + return args; + } + + public void setArgs(ArrayList<String> args) { + this.args = args; + } + + public LivyTypeEnum getLivyTypeEnum() { + return this.livyTypeEnum; + } + + public void setLivyTypeEnum(LivyTypeEnum livyTypeEnum) { + this.livyTypeEnum = livyTypeEnum; + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java new file mode 100644 index 0000000..978b99d --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.livy; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; +import org.apache.kylin.common.KylinConfig; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * + */ +public class LivyRestClient { + + private int httpConnectionTimeoutMs = 30000; + private int httpSocketTimeoutMs = 120000; + + protected String baseUrl; + protected DefaultHttpClient client; + + final private KylinConfig config = KylinConfig.getInstanceFromEnv(); + + public LivyRestClient() { + init(); + } + + + private void init() { + final HttpParams httpParams = new BasicHttpParams(); + HttpConnectionParams.setSoTimeout(httpParams, httpSocketTimeoutMs); + HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs); + + PoolingClientConnectionManager cm = new PoolingClientConnectionManager(); + cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute()); + cm.setMaxTotal(config.getRestClientMaxTotal()); + + baseUrl = config.getLivyUrl(); + client = new DefaultHttpClient(cm, httpParams); + } + + public String livySubmitJobBatches(String jobJson) throws IOException { + String url = baseUrl + "/batches"; + HttpPost post = newPost(url); + + post.setEntity(new StringEntity(jobJson, "UTF-8")); + HttpResponse response = client.execute(post); + String content = getContent(response); + if (response.getStatusLine().getStatusCode() != 201) { + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n"); + } + return content; + } + + + public String livyGetJobStatusBatches(String jobId) throws IOException { + String url = baseUrl + "/batches/" + jobId; + HttpGet get = new HttpGet(url); + + HttpResponse response = client.execute(get); + String content = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n"); + } + return content; + } + + public String livyDeleteBatches(String jobId) throws IOException { + String url = baseUrl + "/batches/" + jobId; + HttpDelete delete = new HttpDelete(url); + + HttpResponse response = client.execute(delete); + String content = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n"); + } + return content; + } + + private HttpPost newPost(String url) { + HttpPost post = new HttpPost(url); + addHttpHeaders(post); + return post; + } + + private void addHttpHeaders(HttpRequestBase method) { + method.addHeader("Accept", "application/json, text/plain, */*"); + method.addHeader("Content-Type", "application/json"); + } + + private String getContent(HttpResponse response) throws IOException { + InputStreamReader reader = null; + BufferedReader rd = null; + StringBuffer result = new StringBuffer(); + try { + reader = new InputStreamReader(response.getEntity().getContent(), "UTF-8"); + rd = new BufferedReader(reader); + String line; + while ((line = rd.readLine()) != null) { + result.append(line); + } + } finally { + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(rd); + } + return result.toString(); + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java new file mode 100644 index 0000000..03d66aa --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestExecutor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.livy; + +import org.apache.kylin.common.util.Logger; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class LivyRestExecutor { + + protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(LivyRestExecutor.class); + + public void execute(LivyRestBuilder livyRestBuilder, Logger logAppender) { + try { + long startTime = System.currentTimeMillis(); + + String dataJson = livyRestBuilder.build(); + + logAppender.log("Livy submit Json: "); + logAppender.log(dataJson + "\n"); + + LivyRestClient restClient = new LivyRestClient(); + String result = restClient.livySubmitJobBatches(dataJson); + + JSONObject resultJson = new JSONObject(result); + String state = resultJson.getString("state"); + logAppender.log("Livy submit Result: " + state); + logger.info("Livy submit Result: {}", state); + + livyLog(resultJson, logAppender); + + final String livyTaskId = resultJson.getString("id"); + while (!LivyStateEnum.shutting_down.toString().equalsIgnoreCase(state) + && !LivyStateEnum.error.toString().equalsIgnoreCase(state) + && !LivyStateEnum.dead.toString().equalsIgnoreCase(state) + && !LivyStateEnum.success.toString().equalsIgnoreCase(state)) { + + String statusResult = restClient.livyGetJobStatusBatches(livyTaskId); + JSONObject stateJson = new JSONObject(statusResult); + if (!state.equalsIgnoreCase(stateJson.getString("state"))) { + logAppender.log("Livy status Result: " + stateJson.getString("state")); + livyLog(stateJson, logAppender); + } + state = stateJson.getString("state"); + Thread.sleep(10*1000); + } + if (!LivyStateEnum.success.toString().equalsIgnoreCase(state)) { + logAppender.log("livy start execute failed. state is " + state); + logger.info("livy start execute failed. state is {}", state); + throw new RuntimeException("livy get status failed. state is " + state); + } + logAppender.log("costTime : " + (System.currentTimeMillis() - startTime) / 1000 + " s"); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("livy execute failed. \n" + e.getMessage()); + } + } + + public String state(String batchId) { + try { + LivyRestClient restClient = new LivyRestClient(); + String statusResult = restClient.livyGetJobStatusBatches(batchId); + JSONObject stateJson = new JSONObject(statusResult); + return stateJson.getString("state"); + } catch (Exception e) { + e.printStackTrace(); + return ""; + } + } + + public Boolean kill(String batchId) { + try { + LivyRestClient restClient = new LivyRestClient(); + String statusResult = restClient.livyDeleteBatches(batchId); + JSONObject stateJson = new JSONObject(statusResult); + return stateJson.getString("msg").equalsIgnoreCase("deleted")? true: false; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + + private void livyLog(JSONObject logInfo, Logger logger) { + if (logInfo.has("log")) { + try { + JSONArray logArray = logInfo.getJSONArray("log"); + + for (int i=0;i<logArray.length();i++) { + String info = logArray.getString(i); + logger.log(info); + } + + } catch (JSONException e) { + e.printStackTrace(); + } + logInfo.remove("log"); + logger.log(logInfo.toString()); + } + } + +} diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java new file mode 100644 index 0000000..30c7188 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyStateEnum.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.livy; + +/** + */ +public enum LivyStateEnum { + starting, running, success, dead, error, not_started, idle, busy, shutting_down; +} diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java new file mode 100644 index 0000000..e4a44b1 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyTypeEnum.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.livy; + +/** + */ +public enum LivyTypeEnum { + sql, job; +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 3f3c14d..426a73f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -88,7 +88,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { } public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) { - final SparkExecutable sparkExecutable = new SparkExecutable(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy()); final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId)); @@ -115,7 +115,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { } protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { - final SparkExecutable sparkExecutable = new SparkExecutable(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy()); sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath); result.addTask(sparkExecutable); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java index eb67fef..21599ff 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java @@ -77,7 +77,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport { } public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) { - final SparkExecutable sparkExecutable = new SparkExecutable(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy()); sparkExecutable.setClassName(SparkMergingDictionary.class.getName()); sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); @@ -108,7 +108,7 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport { String formattedPath = StringUtil.join(mergingCuboidPaths, ","); String outputPath = getCuboidRootPath(jobID); - final SparkExecutable sparkExecutable = new SparkExecutable(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy()); sparkExecutable.setClassName(SparkCubingMerge.class.getName()); sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 8c4b99d..8535212 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -349,7 +349,7 @@ public class SparkExecutable extends AbstractExecutable { } } - private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException { + protected void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException { try { if (mergingSeg == null || mergingSeg.size() == 0) { attachSegmentMetadataWithDict(segment); @@ -365,7 +365,7 @@ public class SparkExecutable extends AbstractExecutable { } // Spark Cubing can only work in layer algorithm - private void setAlgorithmLayer() { + protected void setAlgorithmLayer() { ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); CubingJob cubingJob = (CubingJob) execMgr.getJob(this.getParam(JOB_ID)); cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER); @@ -443,7 +443,7 @@ public class SparkExecutable extends AbstractExecutable { this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt())); } - private void readCounters(final Map<String, String> info) { + protected void readCounters(final Map<String, String> info) { String counter_save_as = getCounterSaveAs(); if (counter_save_as != null) { String[] saveAsNames = counter_save_as.split(","); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java new file mode 100644 index 0000000..bc59c12 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import org.slf4j.LoggerFactory; + +/** + */ +public class SparkExecutableFactory { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableFactory.class); + + public static SparkExecutable instance(boolean livy) { + return livy ? new SparkExecutableLivy() : new SparkExecutable(); + } + +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java new file mode 100644 index 0000000..d512104 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutableLivy.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.engine.spark; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.livy.LivyRestBuilder; +import org.apache.kylin.common.livy.LivyRestExecutor; +import org.apache.kylin.common.livy.LivyStateEnum; +import org.apache.kylin.common.livy.LivyTypeEnum; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.metadata.model.Segments; +import org.apache.parquet.Strings; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class SparkExecutableLivy extends SparkExecutable { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutableLivy.class); + + private static final String CLASS_NAME = "className"; + private static final String JARS = "jars"; + private static final String JOB_ID = "jobId"; + private static final String COUNTER_SAVE_AS = "CounterSaveAs"; + private static final String CONFIG_NAME = "configName"; + + public void formatArgs(List<String> args) { + //-className must first + for (Map.Entry<String, String> entry : getParams().entrySet()) { + if (entry.getKey().equals(CLASS_NAME)) { + args.add("-" + entry.getKey()); + args.add(entry.getValue()); + break; + } + } + for (Map.Entry<String, String> entry : getParams().entrySet()) { + if (entry.getKey().equals(CLASS_NAME) || entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) + || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) { + continue; + } else { + args.add("-" + entry.getKey()); + args.add(entry.getValue()); + } + } + } + + @Override + protected void onExecuteStart(ExecutableContext executableContext) { + final Output output = getOutput(); + if (output.getExtra().containsKey(START_TIME)) { + final String sparkJobID = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID); + if (sparkJobID == null) { + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + return; + } + try { + String status = getAppState(sparkJobID); + if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status) + || LivyStateEnum.error.name().equalsIgnoreCase(status) + || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) { + //remove previous mr job info + super.onExecuteStart(executableContext); + } else { + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + } + } catch (IOException e) { + logger.warn("error get hadoop status"); + super.onExecuteStart(executableContext); + } + } else { + super.onExecuteStart(executableContext); + } + } + + private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException { + Map<String, String> info = new HashMap<>(); + try { + logger.info("livy spark_job_id:" + appId + " resumed"); + info.put(ExecutableConstants.SPARK_JOB_ID, appId); + + while (!isPaused() && !isDiscarded()) { + String status = getAppState(appId); + + if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status) + || LivyStateEnum.error.name().equalsIgnoreCase(status) + || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) { + mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed"); + return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed"); + } + + if (LivyStateEnum.success.name().equalsIgnoreCase(status)) { + mgr.addJobInfo(getId(), info); + return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished"); + } + + Thread.sleep(5000); + } + + killAppRetry(appId); + + if (isDiscarded()) { + return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded"); + } else { + return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped"); + } + + } catch (Exception e) { + logger.error("error run spark job:", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + + } + + @SuppressWarnings("checkstyle:methodlength") + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + ExecutableManager mgr = getManager(); + Map<String, String> extra = mgr.getOutput(getId()).getExtra(); + String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID); + if (!StringUtils.isEmpty(sparkJobId)) { + return onResumed(sparkJobId, mgr); + } else { + String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()); + CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName); + final KylinConfig config = cube.getConfig(); + + setAlgorithmLayer(); + + LivyRestBuilder livyRestBuilder = new LivyRestBuilder(); + + String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); + CubeSegment segment = cube.getSegmentById(segmentID); + Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment); + dumpMetadata(segment, mergingSeg); + + Map<String, String> sparkConfs = config.getSparkConfigOverride(); + String sparkConfigName = getSparkConfigName(); + if (sparkConfigName != null) { + Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName); + sparkConfs.putAll(sparkSpecificConfs); + } + + for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { + if (entry.getKey().equals("spark.submit.deployMode") || entry.getKey().equals("spark.master") || entry.getKey().equals("spark.yarn.archive")) { + continue; + } else { + livyRestBuilder.addConf(entry.getKey(), entry.getValue()); + } + } + formatArgs(livyRestBuilder.getArgs()); + + final LivyRestExecutor executor = new LivyRestExecutor(); + final PatternedLogger patternedLogger = new PatternedLogger(logger, (infoKey, info) -> { + // only care three properties here + if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey) + || ExecutableConstants.YARN_APP_ID.equals(infoKey) + || ExecutableConstants.YARN_APP_URL.equals(infoKey)) { + getManager().addJobInfo(getId(), info); + } + }); + + try { + livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.job); + executor.execute(livyRestBuilder, patternedLogger); + if (isDiscarded()) { + return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded"); + } + if (isPaused()) { + return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped"); + } + // done, update all properties + Map<String, String> joblogInfo = patternedLogger.getInfo(); + // read counter from hdfs + String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT); + if (counterOutput != null) { + if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) { + Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); + joblogInfo.putAll(counterMap); + } else { + logger.warn("Spark counter output path not exists: " + counterOutput); + } + } + readCounters(joblogInfo); + getManager().addJobInfo(getId(), joblogInfo); + return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("error run spark job:", e); + // clear SPARK_JOB_ID on job failure. + extra = mgr.getOutput(getId()).getExtra(); + extra.put(ExecutableConstants.SPARK_JOB_ID, ""); + getManager().addJobInfo(getId(), extra); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); + } + } + } + + private String getAppState(String appId) throws IOException { + LivyRestExecutor executor = new LivyRestExecutor(); + return executor.state(appId); + } + + private void killApp(String appId) throws IOException, InterruptedException { + LivyRestExecutor executor = new LivyRestExecutor(); + executor.kill(appId); + } + + private int killAppRetry(String appId) throws IOException, InterruptedException { + String status = getAppState(appId); + if (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status) + || LivyStateEnum.error.name().equalsIgnoreCase(status) + || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status)) { + logger.warn(appId + "is final state, no need to kill"); + return 0; + } + + killApp(appId); + + status = getAppState(appId); + int retry = 0; + while (Strings.isNullOrEmpty(status) || LivyStateEnum.dead.name().equalsIgnoreCase(status) + || LivyStateEnum.error.name().equalsIgnoreCase(status) + || LivyStateEnum.shutting_down.name().equalsIgnoreCase(status) && retry < 5) { + killApp(appId); + + Thread.sleep(1000); + + status = getAppState(appId); + retry++; + } + + if (Strings.isNullOrEmpty(status)) { + logger.info(appId + " killed successfully"); + return 0; + } else { + logger.info(appId + " killed failed"); + return 1; + } + } + +} diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala new file mode 100644 index 0000000..8ea2cfa --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkSqlOnLivyBatch.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark + +import java.util.Locale + +import org.apache.commons.lang.StringUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +object SparkSqlOnLivyBatch extends Logging{ + + def main(args: Array[String]) { + + if (args.length != 1) { + log.info("Usage: SparkSqlOnLivyBatch <sqlstring>") + System.exit(1) + } + + val sql : String = args(0) + log.info(String.format(Locale.ROOT, "Sql-Info : %s", sql)) + + val spark = SparkSession.builder().enableHiveSupport().appName("kylin-sql-livy").getOrCreate() + + val sqlStrings = sql.split(";") + + for (sqlString <- sqlStrings) { + var item = sqlString.trim() + if (item.length > 0) { + if (StringUtils.endsWith(item, "\\")) { + item = StringUtils.chop(item) + } + spark.sql(item) + } + } + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java new file mode 100644 index 0000000..3549630 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableByLivyStep.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.hive; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.livy.LivyRestBuilder; +import org.apache.kylin.common.livy.LivyRestExecutor; +import org.apache.kylin.common.livy.LivyTypeEnum; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * + */ +public class CreateFlatHiveTableByLivyStep extends AbstractExecutable { + private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableByLivyStep.class); + protected final PatternedLogger stepLogger = new PatternedLogger(logger); + private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';"); + + + protected void createFlatHiveTable(KylinConfig config) throws Exception { + final LivyRestBuilder livyRestBuilder = new LivyRestBuilder(); + livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride()); + livyRestBuilder.addArgs(livyRestBuilder.parseProps() + getInitStatement() + getCreateTableStatement()); + + stepLogger.log("Create and distribute table. "); + livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql); + + LivyRestExecutor executor = new LivyRestExecutor(); + executor.execute(livyRestBuilder, stepLogger); + + Map<String, String> info = stepLogger.getInfo(); + //get the flat Hive table size + Matcher matcher = HDFS_LOCATION.matcher(getCreateTableStatement()); + if (matcher.find()) { + String hiveFlatTableHdfsUrl = matcher.group(1); + long size = getFileSize(hiveFlatTableHdfsUrl); + info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size); + logger.info("HDFS_Bytes_Writen: " + size); + } + getManager().addJobInfo(getId(), info); + } + + private long getFileSize(String hdfsUrl) throws IOException { + Configuration configuration = new Configuration(); + Path path = new Path(hdfsUrl); + FileSystem fs = path.getFileSystem(configuration); + ContentSummary contentSummary = fs.getContentSummary(path); + long length = contentSummary.getLength(); + return length; + } + + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + try { + createFlatHiveTable(config); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e); + } + } + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setCreateTableStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getCreateTableStatement() { + return getParam("HiveRedistributeData"); + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 2f25e50..4782920 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -44,6 +44,7 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; @@ -84,8 +85,13 @@ public class HiveInputBase { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, - cubeInstance.getDescriptor())); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + //jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) { + jobFlow.addTask(createRedistributeFlatHiveTableByLivyStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + } else { + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + } } // special for hive @@ -97,7 +103,15 @@ public class HiveInputBase { final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + + if (kylinConfig.enableLivy() && cubeInstance.getEngineType() == IEngineAware.ID_SPARK) { + jobFlow.addTask(createFlatHiveTableByLivyStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); + } else { + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); + } + //jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc)); } protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { @@ -152,6 +166,21 @@ public class HiveInputBase { return step; } + protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir, + String cubeName, IJoinedFlatTableDesc flatDesc) { + //from hive to hive + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); + + CreateFlatHiveTableByLivyStep step = new CreateFlatHiveTableByLivyStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + return step; + } + protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); @@ -163,6 +192,17 @@ public class HiveInputBase { return step; } + protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements, String cubeName, + IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { + RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep(); + step.setInitStatement(hiveInitStatements); + step.setIntermediateTable(flatDesc.getTableName()); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc)); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); + return step; + } + protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) { ShellExecutable step = new ShellExecutable(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java new file mode 100644 index 0000000..4c07324 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableByLivyStep.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.hive; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.livy.LivyRestBuilder; +import org.apache.kylin.common.livy.LivyRestExecutor; +import org.apache.kylin.common.livy.LivyTypeEnum; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + + +public class RedistributeFlatHiveTableByLivyStep extends AbstractExecutable { + private final PatternedLogger stepLogger = new PatternedLogger(logger); + + private long computeRowCount(String database, String table) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + return hiveClient.getHiveTableRows(database, table); + } + + private long getDataSize(String database, String table) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + long size = hiveClient.getHiveTableMeta(database, table).fileSize; + return size; + } + + private void redistributeTable(KylinConfig config, int numReducers) throws Exception { + final LivyRestBuilder livyRestBuilder = new LivyRestBuilder(); + livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride()); + StringBuffer statement = new StringBuffer(); + statement.append(livyRestBuilder.parseProps()); + statement.append(getInitStatement()); + statement.append("set mapreduce.job.reduces=" + numReducers + ";\n"); + statement.append("set hive.merge.mapredfiles=false;\n"); + statement.append(getRedistributeDataStatement()); + livyRestBuilder.addArgs(statement.toString()); + final String cmd = livyRestBuilder.toString(); + + stepLogger.log("Redistribute table, cmd: "); + stepLogger.log(cmd); + + livyRestBuilder.setLivyTypeEnum(LivyTypeEnum.sql); + + LivyRestExecutor executor = new LivyRestExecutor(); + executor.execute(livyRestBuilder, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + } + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + String intermediateTable = getIntermediateTable(); + String database, tableName; + if (intermediateTable.indexOf(".") > 0) { + database = intermediateTable.substring(0, intermediateTable.indexOf(".")); + tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1); + } else { + database = config.getHiveDatabaseForIntermediateTable(); + tableName = intermediateTable; + } + + try { + long rowCount = computeRowCount(database, tableName); + logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); + if (rowCount == 0) { + if (!config.isEmptySegmentAllowed()) { + stepLogger.log("Detect upstream hive table is empty, " + + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\""); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } else { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute"); + } + } + + int mapperInputRows = config.getHadoopJobMapperInputRows(); + + int numReducers = Math.round(rowCount / ((float) mapperInputRows)); + numReducers = Math.max(1, numReducers); + numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber()); + + stepLogger.log("total input rows = " + rowCount); + stepLogger.log("expected input rows per mapper = " + mapperInputRows); + stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); + + redistributeTable(config, numReducers); + long dataSize = getDataSize(database, tableName); + getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e); + } + } + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setRedistributeDataStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getRedistributeDataStatement() { + return getParam("HiveRedistributeData"); + } + + public String getIntermediateTable() { + return getParam("intermediateTable"); + } + + public void setIntermediateTable(String intermediateTable) { + setParam("intermediateTable", intermediateTable); + } +} \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java index 86ad0fb..1f35de4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java @@ -25,6 +25,7 @@ import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2; import org.apache.kylin.engine.spark.SparkExecutable; +import org.apache.kylin.engine.spark.SparkExecutableFactory; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.AbstractExecutable; @@ -39,7 +40,7 @@ public class HBaseSparkSteps extends HBaseJobSteps { String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/"); SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null); - final SparkExecutable sparkExecutable = new SparkExecutable(); + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig().enableLivy()); sparkExecutable.setClassName(SparkCubeHFile.class.getName()); sparkExecutable.setParam(SparkCubeHFile.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());