This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new fe04ca8 KYLIN-4613 add build cubev2 and JobRestClient fe04ca8 is described below commit fe04ca8d3be6c50742246663d6fd6767a568bcf8 Author: chuxiao <chux...@didichuxing.com> AuthorDate: Wed Jun 5 21:45:52 2019 +0800 KYLIN-4613 add build cubev2 and JobRestClient --- .../java/org/apache/kylin/common/KylinConfig.java | 16 +- .../apache/kylin/common/restclient/RestClient.java | 16 +- .../org/apache/kylin/job/util/JobRestClient.java | 133 +++++++++++++++++ .../apache/kylin/tool/job/CubeBuildingCLIV2.java | 161 +++++++++++++++++++++ 4 files changed, 316 insertions(+), 10 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 30cf71c..58b1418 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -134,7 +134,7 @@ public class KylinConfig extends KylinConfigBase { } } - public static KylinConfig getInstanceFromEnv() { + public static KylinConfig getInstanceFromEnv(boolean allowConfigFileNoExist) { synchronized (KylinConfig.class) { KylinConfig config = THREAD_ENV_INSTANCE.get(); if (config != null) { @@ -148,7 +148,15 @@ public class KylinConfig extends KylinConfigBase { buildDefaultOrderedProperties(); config = new KylinConfig(); - config.reloadKylinConfig(buildSiteProperties()); + try { + config.reloadKylinConfig(buildSiteProperties()); + } catch (KylinConfigCannotInitException e) { + logger.info("Kylin Config Can not Init Exception"); + if (!allowConfigFileNoExist) { + throw e; + } + } + VersionUtil.loadKylinVersion(); logger.info("Initialized a new KylinConfig from getInstanceFromEnv : {}", System.identityHashCode(config)); @@ -161,6 +169,10 @@ public class KylinConfig extends KylinConfigBase { } } + public static KylinConfig getInstanceFromEnv() { + return getInstanceFromEnv(false); + } + // Only used in test cases!!! public static void destroyInstance() { synchronized (KylinConfig.class) { diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index cc96418..fcd8706 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -61,10 +61,10 @@ import com.google.common.base.Strings; public class RestClient { private static final Logger logger = LoggerFactory.getLogger(RestClient.class); - private static final String UTF_8 = "UTF-8"; + protected static final String UTF_8 = "UTF-8"; private static final String APPLICATION_JSON = "application/json"; - private static final String INVALID_RESPONSE = "Invalid response "; - private static final String CUBES = "/cubes/"; + protected static final String INVALID_RESPONSE = "Invalid response "; + protected static final String CUBES = "/cubes/"; private static final String WITH_URL = " with url "; protected static Pattern fullRestPattern = Pattern.compile("(?:([^:]+)[:]([^@]+)[@])?([^:]+)(?:[:](\\d+))?"); @@ -142,7 +142,7 @@ public class RestClient { HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs); final PoolingClientConnectionManager cm = new PoolingClientConnectionManager(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); + KylinConfig config = KylinConfig.getInstanceFromEnv(true); cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute()); cm.setMaxTotal(config.getRestClientMaxTotal()); @@ -434,13 +434,13 @@ public class RestClient { return post; } - private HttpPut newPut(String url) { + protected HttpPut newPut(String url) { HttpPut put = new HttpPut(url); addHttpHeaders(put); return put; } - private HttpGet newGet(String url) { + protected HttpGet newGet(String url) { HttpGet get = new HttpGet(url); addHttpHeaders(get); return get; @@ -467,7 +467,7 @@ public class RestClient { } } - private String getContent(HttpResponse response) throws IOException { + protected String getContent(HttpResponse response) throws IOException { StringBuffer result = new StringBuffer(); try (BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { @@ -480,7 +480,7 @@ public class RestClient { return result.toString(); } - private void cleanup(HttpRequestBase request, HttpResponse response) { + protected void cleanup(HttpRequestBase request, HttpResponse response) { try { if (response != null) EntityUtils.consume(response.getEntity()); diff --git a/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java new file mode 100644 index 0000000..9b52f35 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.util; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.job.JobInstance; + +import java.io.IOException; +import java.util.HashMap; + +public class JobRestClient extends RestClient { + private static final String JOBS = "/jobs/"; + + public JobRestClient(String host, int port, String userName, String password) { + this(host, port, userName, password, null, null); + } + + public JobRestClient(String host, int port, String userName, String password, Integer httpConnectionTimeoutMs, Integer httpSocketTimeoutMs) { + super(host, port, userName, password, httpConnectionTimeoutMs, httpSocketTimeoutMs); + } + + public JobInstance buildCubeV2(String cubeName, long startTime, long endTime, CubeBuildTypeEnum buildType) throws IOException { + String url = baseUrl + CUBES + cubeName + "/build"; + HttpPut put = newPut(url); + HttpResponse response = null; + try { + HashMap<String, String> paraMap = new HashMap<String, String>(); + paraMap.put("startTime", startTime + ""); + paraMap.put("endTime", endTime + ""); + paraMap.put("buildType", buildType.toString()); + String jsonMsg = new ObjectMapper().writeValueAsString(paraMap); + put.setEntity(new StringEntity(jsonMsg, UTF_8)); + response = client.execute(put); + String result = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode() + + result + " with build cube url " + url + "\n" + jsonMsg); + } else { + return json2JobInstance(result); + } + } finally { + cleanup(put, response); + } + } + + protected JobInstance json2JobInstance(String json) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + JobInstance jobInstance = mapper.readValue(json, JobInstance.class); + return jobInstance; + } + + public JobInstance getJobStatus(String jobId) throws IOException { + String url = baseUrl + JOBS + jobId; + HttpGet get = newGet(url); + HttpResponse response = null; + try { + response = client.execute(get); + String result = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode() + + result + " with get job status " + jobId); + } else { + return json2JobInstance(result); + } + } finally { + cleanup(get, response); + } + } + + public String JobInstance2JsonString(JobInstance jobInstance) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + String jsonString = mapper.writeValueAsString(jobInstance); + return jsonString; + } + + public JobInstance resumeJob(String jobId) throws IOException { + String url = baseUrl + JOBS + jobId + "/resume"; + HttpPut put = newPut(url); + HttpResponse response = null; + try { + response = client.execute(put); + String result = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode() + + result + " with resume job " + jobId); + } else { + return json2JobInstance(result); + } + } finally { + cleanup(put, response); + } + } + + public void discardJob(String jobId) throws IOException { + String url = baseUrl + JOBS + jobId + "/cancel"; + HttpPut put = newPut(url); + HttpResponse response = null; + try { + response = client.execute(put); + String result = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode() + + result + " with discard job " + jobId); + } + } finally { + cleanup(put, response); + } + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java new file mode 100644 index 0000000..bb66db2 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.job; + +import com.google.common.base.Strings; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.util.JobRestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class CubeBuildingCLIV2 extends AbstractApplication { + + private static final Logger logger = LoggerFactory.getLogger(CubeBuildingCLIV2.class); + + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(true) + .withDescription("Specify for which cube to build").create("cube"); + private static final Option OPTION_BUILD_TYPE = OptionBuilder.withArgName("buildType").hasArg().isRequired(false) + .withDescription("Specify for the build type").create("buildType"); + private static final Option OPTION_TIME_START = OptionBuilder.withArgName("startTime").hasArg().isRequired(false) + .withDescription("Specify the start time of the segment").create("startTime"); + private static final Option OPTION_TIME_END = OptionBuilder.withArgName("endTime").hasArg().isRequired(true) + .withDescription("Specify the end time of the segment").create("endTime"); + private static final Option OPTION_HOST = OptionBuilder.withArgName("host").hasArg().isRequired(true) + .withDescription("Specify the kylin server host").create("host"); + private static final Option OPTION_PORT = OptionBuilder.withArgName("port").hasArg().isRequired(true) + .withDescription("Specify the kylin server port").create("port"); + private static final Option OPTION_USER_NAME = OptionBuilder.withArgName("userName").hasArg().isRequired(true) + .withDescription("Specify the kylin server user name").create("userName"); + private static final Option OPTION_PASSWORD= OptionBuilder.withArgName("password").hasArg().isRequired(true) + .withDescription("Specify the kylin server password").create("password"); + private static final Option OPTION_WAITING_FOR_END = OptionBuilder.withArgName("waitingForEnd").hasArg().isRequired(false) + .withDescription("Specify whether waiting for end").create("waitingForEnd"); + private static final Option OPTION_RETRY_NUMBER = OptionBuilder.withArgName("retryNumber").hasArg().isRequired(false) + .withDescription("Specify retry number when execute failed").create("retryNumber"); + private static final Option OPTION_DISCARD_ERROR_JOB = OptionBuilder.withArgName("discardErrorJob").hasArg().isRequired(false) + .withDescription("Specify discard job when execute failed").create("discardErrorJob"); + + private final Options options; + + public CubeBuildingCLIV2() { + options = new Options(); + options.addOption(OPTION_CUBE); + options.addOption(OPTION_BUILD_TYPE); + options.addOption(OPTION_TIME_START); + options.addOption(OPTION_TIME_END); + options.addOption(OPTION_HOST); + options.addOption(OPTION_PORT); + options.addOption(OPTION_USER_NAME); + options.addOption(OPTION_PASSWORD); + options.addOption(OPTION_WAITING_FOR_END); + options.addOption(OPTION_RETRY_NUMBER); + options.addOption(OPTION_DISCARD_ERROR_JOB); + } + + protected Options getOptions() { + return options; + } + + protected void execute(OptionsHelper optionsHelper) throws IOException { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE); + String buildType = optionsHelper.getOptionValue(OPTION_BUILD_TYPE); + if (Strings.isNullOrEmpty(buildType)) { + buildType = "BUILD"; + } + Long startTime = 0L; + if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_TIME_START))) { + startTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_START)); + } + Long endTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_END)); + String host = optionsHelper.getOptionValue(OPTION_HOST); + Integer port = Integer.parseInt(optionsHelper.getOptionValue(OPTION_PORT)); + + String userName = optionsHelper.getOptionValue(OPTION_USER_NAME); + String password = optionsHelper.getOptionValue(OPTION_PASSWORD); + + Boolean waitingForEnd = true; + if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END))) { + waitingForEnd = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END)); + } + JobRestClient client = new JobRestClient(host, port, userName, password); + System.out.println("start building cube."); + JobInstance jobInstance = submitJob(client, cubeName, startTime, endTime, buildType); + if (waitingForEnd) { + int retryNumber = 0; + if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER))) { + retryNumber = Integer.parseInt(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER)); + } + while (!jobInstance.getStatus().isComplete()) { + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + System.err.println("Thread interrupted, exit"); + System.exit(-1); + } + jobInstance = client.getJobStatus(jobInstance.getId()); + System.out.println("job " + jobInstance.getId() + " get status : " + jobInstance.getStatus()); + if (jobInstance.getStatus().equals(JobStatusEnum.ERROR) && retryNumber > 0) { + System.out.println("retry count is " + retryNumber); + retryNumber--; + jobInstance = client.resumeJob(jobInstance.getId()); + } + } + if (!jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) { + boolean discardErrorJob = false; + if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB))) { + discardErrorJob = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB)); + } + if (discardErrorJob) { + client.discardJob(jobInstance.getId()); + } + System.exit(-1); + } + + } + } + + private JobInstance submitJob(JobRestClient client, String cubeName, long startDate, long endDate, String buildType) throws IOException { + CubeBuildTypeEnum buildTypeEnum = CubeBuildTypeEnum.valueOf(buildType); + JobInstance jobInstance = client.buildCubeV2(cubeName, startDate, endDate, buildTypeEnum); + System.out.println("building cube job:"); + System.out.println(client.JobInstance2JsonString(jobInstance)); + return jobInstance; + } + + + public static void main(String[] args) { + CubeBuildingCLIV2 cli = new CubeBuildingCLIV2(); + try { + cli.execute(args); + System.exit(0); + } catch (Exception e) { + logger.error("error running cube building", e); + System.exit(-1); + } + } +}