This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 17dcf01 KYLIN-4612 support job status writing to kafka
17dcf01 is described below
commit 17dcf010ca9f0e2fd85761b94419df4ee5d80b94
Author: chuxiao <[email protected]>
AuthorDate: Mon Jul 8 10:33:05 2019 +0800
KYLIN-4612 support job status writing to kafka
---
core-common/pom.xml | 5 ++
.../org/apache/kylin/common/KylinConfigBase.java | 8 ++
.../kylin/common/kafka/KafkaMsgProducer.java | 93 ++++++++++++++++++++++
.../kylin/job/execution/ExecutableManager.java | 58 ++++++++++++++
4 files changed, 164 insertions(+)
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 5ec2838..bff8797 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -73,6 +73,11 @@
<artifactId>jsch</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
<groupId>junit</groupId>
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 56a7a3f..af3b4ae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2678,4 +2678,12 @@ public abstract class KylinConfigBase implements
Serializable {
public int getDefaultTimeFilter() {
return Integer.parseInt(getOptional("kylin.web.default-time-filter",
"2"));
}
+
+ public boolean jobStatusWriteKafka() {
+ return
Boolean.parseBoolean(getOptional("kylin.engine.job-status.write.kafka", FALSE));
+ }
+
+ public Map<String, String> getJobStatusKafkaConfig() {
+ return getPropertiesByPrefix("kylin.engine.job-status.kafka.");
+ }
}
diff --git
a/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java
b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java
new file mode 100644
index 0000000..98068a8
--- /dev/null
+++
b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaMsgProducer {
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaMsgProducer.class);
+
+ private static Producer<String, String> producer;
+ private static Map<String, String> kafkaConfig;
+ private static String TOPIC_NAME;
+
+
+ private static Properties kafkaProperties = new Properties() {
+ {
+ put(ProducerConfig.ACKS_CONFIG, "-1");
+ put(ProducerConfig.RETRIES_CONFIG, 3);
+ put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
+ put(ProducerConfig.LINGER_MS_CONFIG, 500);
+ put(ProducerConfig.BATCH_SIZE_CONFIG, 10000);
+ put("max.in.flight.requests.per.connection", 1);
+ }
+ };
+
+
+ private KafkaMsgProducer() {
+ init();
+ }
+
+ private static class BasicProducerHolder {
+ private static final KafkaMsgProducer INSTANCE = new
KafkaMsgProducer();
+ }
+
+ public static final KafkaMsgProducer getInstance() {
+ return BasicProducerHolder.INSTANCE;
+ }
+
+ public void init() {
+ if (null == kafkaConfig) {
+ kafkaConfig =
KylinConfig.getInstanceFromEnv().getJobStatusKafkaConfig();
+ }
+ if (null == producer) {
+ kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.get("bootstrap.servers"));
+ for(Map.Entry<String, String> entry : kafkaConfig.entrySet()){
+ kafkaProperties.put(entry.getKey(), entry.getValue());
+ }
+ producer = new KafkaProducer<>(kafkaProperties);
+ }
+ if (null == TOPIC_NAME) {
+ TOPIC_NAME = kafkaConfig.get("topic.name");
+ }
+ }
+
+ public void sendJobStatusMessage(String message) {
+ sendMessage(message);
+ }
+
+ private void sendMessage(final String message) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(TOPIC_NAME, message);
+ producer.send(record, (recordMetadata, exception) -> {
+ if (null != exception) {
+ logger.error("kafka send message error.", exception);
+ }
+ });
+ }
+
+}
diff --git
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 06f291b..791011c 100644
---
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -32,8 +32,10 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.kafka.KafkaMsgProducer;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.dao.ExecutablePO;
@@ -65,11 +67,15 @@ public class ExecutableManager {
private final KylinConfig config;
private final ExecutableDao executableDao;
+ private KafkaMsgProducer kafkaMsgProducer;
private ExecutableManager(KylinConfig config) {
logger.info("Using metadata url: " + config);
this.config = config;
this.executableDao = ExecutableDao.getInstance(config);
+ if (config.jobStatusWriteKafka()) {
+ this.kafkaMsgProducer = KafkaMsgProducer.getInstance();
+ }
}
private static ExecutablePO parse(AbstractExecutable executable) {
@@ -477,12 +483,64 @@ public class ExecutableManager {
}
executableDao.updateJobOutput(jobOutput);
logger.info("job id:" + jobId + " from " + oldStatus + " to " +
newStatus);
+
+ //write status to kafka
+ if (config.jobStatusWriteKafka()) {
+ AbstractExecutable executable = getJob(jobId);
+ if (executable == null) {
+ return;
+ }
+ if (executable instanceof DefaultChainedExecutable) {
+ StringBuffer result = new StringBuffer();
+
+ DefaultChainedExecutable job =
(DefaultChainedExecutable)executable;
+
+ result.append("{");
+
+ result.append("\"jobId\":\"" + job.getId() + "\",");
+ result.append("\"jobName\":\"" + job.getName() + "\",");
+ result.append("\"status\":\"" +
parseToJobStatus(job.getStatus()).name() + "\",");
+ result.append("\"subTaskSize\": \"" +
job.getTasks().size() + "\",");
+
+ result.append("\"subTasks\":[");
+ job.getTasks().forEach(item -> {
+ result.append("{");
+ result.append("\"jobId\":\"" + item.getId() + "\",");
+ result.append("\"jobName\":\"" + item.getName() +
"\",");
+ result.append("\"status\":\"" +
parseToJobStatus(item.getStatus()).name() + "\"");
+ result.append("},");
+ });
+ String resultStr = result.substring(0, result.length() -
1);
+ resultStr += "]}";
+
+ kafkaMsgProducer.sendJobStatusMessage(resultStr);
+ }
+ }
} catch (PersistentException e) {
logger.error("error change job:" + jobId + " to " + newStatus);
throw new RuntimeException(e);
}
}
+ private JobStatusEnum parseToJobStatus(ExecutableState state) {
+ switch (state) {
+ case READY:
+ return JobStatusEnum.PENDING;
+ case RUNNING:
+ return JobStatusEnum.RUNNING;
+ case ERROR:
+ return JobStatusEnum.ERROR;
+ case DISCARDED:
+ return JobStatusEnum.DISCARDED;
+ case SUCCEED:
+ return JobStatusEnum.FINISHED;
+ case STOPPED:
+ return JobStatusEnum.STOPPED;
+ default:
+ throw new RuntimeException("invalid state:" + state);
+ }
+ }
+
public void reloadAll() throws IOException {
executableDao.reloadAll();
}