This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 17dcf01 KYLIN-4612 support job status writing to kafka 17dcf01 is described below commit 17dcf010ca9f0e2fd85761b94419df4ee5d80b94 Author: chuxiao <crow...@163.com> AuthorDate: Mon Jul 8 10:33:05 2019 +0800 KYLIN-4612 support job status writing to kafka --- core-common/pom.xml | 5 ++ .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/common/kafka/KafkaMsgProducer.java | 93 ++++++++++++++++++++++ .../kylin/job/execution/ExecutableManager.java | 58 ++++++++++++++ 4 files changed, 164 insertions(+) diff --git a/core-common/pom.xml b/core-common/pom.xml index 5ec2838..bff8797 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -73,6 +73,11 @@ <artifactId>jsch</artifactId> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + </dependency> + <!-- Env & Test --> <dependency> <groupId>junit</groupId> diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 56a7a3f..af3b4ae 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2678,4 +2678,12 @@ public abstract class KylinConfigBase implements Serializable { public int getDefaultTimeFilter() { return Integer.parseInt(getOptional("kylin.web.default-time-filter", "2")); } + + public boolean jobStatusWriteKafka() { + return Boolean.parseBoolean(getOptional("kylin.engine.job-status.write.kafka", FALSE)); + } + + public Map<String, String> getJobStatusKafkaConfig() { + return getPropertiesByPrefix("kylin.engine.job-status.kafka."); + } } diff --git a/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java new file mode 100644 index 0000000..98068a8 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +public class KafkaMsgProducer { + private static final Logger logger = LoggerFactory.getLogger(KafkaMsgProducer.class); + + private static Producer<String, String> producer; + private static Map<String, String> kafkaConfig; + private static String TOPIC_NAME; + + + private static Properties kafkaProperties = new Properties() { + { + put(ProducerConfig.ACKS_CONFIG, "-1"); + put(ProducerConfig.RETRIES_CONFIG, 3); + put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); + put(ProducerConfig.LINGER_MS_CONFIG, 500); + put(ProducerConfig.BATCH_SIZE_CONFIG, 10000); + put("max.in.flight.requests.per.connection", 1); + } + }; + + + private KafkaMsgProducer() { + init(); + } + + private static class BasicProducerHolder { + private static final KafkaMsgProducer INSTANCE = new KafkaMsgProducer(); + } + + public static final KafkaMsgProducer getInstance() { + return BasicProducerHolder.INSTANCE; + } + + public void init() { + if (null == kafkaConfig) { + kafkaConfig = KylinConfig.getInstanceFromEnv().getJobStatusKafkaConfig(); + } + if (null == producer) { + kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.get("bootstrap.servers")); + for(Map.Entry<String, String> entry : kafkaConfig.entrySet()){ + kafkaProperties.put(entry.getKey(), entry.getValue()); + } + producer = new KafkaProducer<>(kafkaProperties); + } + if (null == TOPIC_NAME) { + TOPIC_NAME = kafkaConfig.get("topic.name"); + } + } + + public void sendJobStatusMessage(String message) { + sendMessage(message); + } + + private void sendMessage(final String message) { + ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message); + producer.send(record, (recordMetadata, exception) -> { + if (null != exception) { + logger.error("kafka send message error.", exception); + } + }); + } + +} diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 06f291b..791011c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -32,8 +32,10 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.kafka.KafkaMsgProducer; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; @@ -65,11 +67,15 @@ public class ExecutableManager { private final KylinConfig config; private final ExecutableDao executableDao; + private KafkaMsgProducer kafkaMsgProducer; private ExecutableManager(KylinConfig config) { logger.info("Using metadata url: " + config); this.config = config; this.executableDao = ExecutableDao.getInstance(config); + if (config.jobStatusWriteKafka()) { + this.kafkaMsgProducer = KafkaMsgProducer.getInstance(); + } } private static ExecutablePO parse(AbstractExecutable executable) { @@ -477,12 +483,64 @@ public class ExecutableManager { } executableDao.updateJobOutput(jobOutput); logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); + + //write status to kafka + if (config.jobStatusWriteKafka()) { + AbstractExecutable executable = getJob(jobId); + if (executable == null) { + return; + } + if (executable instanceof DefaultChainedExecutable) { + StringBuffer result = new StringBuffer(); + + DefaultChainedExecutable job = (DefaultChainedExecutable)executable; + + result.append("{"); + + result.append("\"jobId\":\"" + job.getId() + "\","); + result.append("\"jobName\":\"" + job.getName() + "\","); + result.append("\"status\":\"" + parseToJobStatus(job.getStatus()).name() + "\","); + result.append("\"subTaskSize\": \"" + job.getTasks().size() + "\","); + + result.append("\"subTasks\":["); + job.getTasks().forEach(item -> { + result.append("{"); + result.append("\"jobId\":\"" + item.getId() + "\","); + result.append("\"jobName\":\"" + item.getName() + "\","); + result.append("\"status\":\"" + parseToJobStatus(item.getStatus()).name() + "\""); + result.append("},"); + }); + String resultStr = result.substring(0, result.length() - 1); + resultStr += "]}"; + + kafkaMsgProducer.sendJobStatusMessage(resultStr); + } + } } catch (PersistentException e) { logger.error("error change job:" + jobId + " to " + newStatus); throw new RuntimeException(e); } } + private JobStatusEnum parseToJobStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStatusEnum.PENDING; + case RUNNING: + return JobStatusEnum.RUNNING; + case ERROR: + return JobStatusEnum.ERROR; + case DISCARDED: + return JobStatusEnum.DISCARDED; + case SUCCEED: + return JobStatusEnum.FINISHED; + case STOPPED: + return JobStatusEnum.STOPPED; + default: + throw new RuntimeException("invalid state:" + state); + } + } + public void reloadAll() throws IOException { executableDao.reloadAll(); }