This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 17dcf01  KYLIN-4612 support job status writing to kafka
17dcf01 is described below

commit 17dcf010ca9f0e2fd85761b94419df4ee5d80b94
Author: chuxiao <crow...@163.com>
AuthorDate: Mon Jul 8 10:33:05 2019 +0800

    KYLIN-4612 support job status writing to kafka
---
 core-common/pom.xml                                |  5 ++
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++
 .../kylin/common/kafka/KafkaMsgProducer.java       | 93 ++++++++++++++++++++++
 .../kylin/job/execution/ExecutableManager.java     | 58 ++++++++++++++
 4 files changed, 164 insertions(+)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 5ec2838..bff8797 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -73,6 +73,11 @@
             <artifactId>jsch</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>junit</groupId>
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 56a7a3f..af3b4ae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2678,4 +2678,12 @@ public abstract class KylinConfigBase implements 
Serializable {
     public int getDefaultTimeFilter() {
         return Integer.parseInt(getOptional("kylin.web.default-time-filter", 
"2"));
     }
+
+    public boolean jobStatusWriteKafka() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.job-status.write.kafka", FALSE));
+    }
+
+    public Map<String, String> getJobStatusKafkaConfig() {
+        return getPropertiesByPrefix("kylin.engine.job-status.kafka.");
+    }
 }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java 
b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java
new file mode 100644
index 0000000..98068a8
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/kafka/KafkaMsgProducer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaMsgProducer {
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaMsgProducer.class);
+
+    private static Producer<String, String> producer;
+    private static Map<String, String> kafkaConfig;
+    private static String TOPIC_NAME;
+
+
+    private static Properties kafkaProperties = new Properties() {
+        {
+            put(ProducerConfig.ACKS_CONFIG, "-1");
+            put(ProducerConfig.RETRIES_CONFIG, 3);
+            put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
+            put(ProducerConfig.LINGER_MS_CONFIG, 500);
+            put(ProducerConfig.BATCH_SIZE_CONFIG, 10000);
+            put("max.in.flight.requests.per.connection", 1);
+        }
+    };
+
+
+    private KafkaMsgProducer() {
+        init();
+    }
+
+    private static class BasicProducerHolder {
+        private static final KafkaMsgProducer INSTANCE = new 
KafkaMsgProducer();
+    }
+
+    public static final KafkaMsgProducer getInstance() {
+        return BasicProducerHolder.INSTANCE;
+    }
+
+    public void init() {
+        if (null == kafkaConfig) {
+            kafkaConfig = 
KylinConfig.getInstanceFromEnv().getJobStatusKafkaConfig();
+        }
+        if (null == producer) {
+            kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaConfig.get("bootstrap.servers"));
+            for(Map.Entry<String, String> entry : kafkaConfig.entrySet()){
+                kafkaProperties.put(entry.getKey(), entry.getValue());
+            }
+            producer = new KafkaProducer<>(kafkaProperties);
+        }
+        if (null == TOPIC_NAME) {
+            TOPIC_NAME = kafkaConfig.get("topic.name");
+        }
+    }
+
+    public void sendJobStatusMessage(String message) {
+        sendMessage(message);
+    }
+
+    private void sendMessage(final String message) {
+        ProducerRecord<String, String> record = new 
ProducerRecord<>(TOPIC_NAME, message);
+        producer.send(record, (recordMetadata, exception) -> {
+            if (null != exception) {
+                logger.error("kafka send message error.", exception);
+            }
+        });
+    }
+
+}
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 06f291b..791011c 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -32,8 +32,10 @@ import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.kafka.KafkaMsgProducer;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
@@ -65,11 +67,15 @@ public class ExecutableManager {
 
     private final KylinConfig config;
     private final ExecutableDao executableDao;
+    private KafkaMsgProducer kafkaMsgProducer;
 
     private ExecutableManager(KylinConfig config) {
         logger.info("Using metadata url: " + config);
         this.config = config;
         this.executableDao = ExecutableDao.getInstance(config);
+        if (config.jobStatusWriteKafka()) {
+            this.kafkaMsgProducer = KafkaMsgProducer.getInstance();
+        }
     }
 
     private static ExecutablePO parse(AbstractExecutable executable) {
@@ -477,12 +483,64 @@ public class ExecutableManager {
             }
             executableDao.updateJobOutput(jobOutput);
             logger.info("job id:" + jobId + " from " + oldStatus + " to " + 
newStatus);
+
+            //write status to kafka
+            if (config.jobStatusWriteKafka()) {
+                AbstractExecutable executable = getJob(jobId);
+                if (executable == null) {
+                    return;
+                }
+                if (executable instanceof DefaultChainedExecutable) {
+                    StringBuffer result = new StringBuffer();
+
+                    DefaultChainedExecutable job = 
(DefaultChainedExecutable)executable;
+
+                    result.append("{");
+
+                    result.append("\"jobId\":\"" + job.getId() + "\",");
+                    result.append("\"jobName\":\"" + job.getName() + "\",");
+                    result.append("\"status\":\"" + 
parseToJobStatus(job.getStatus()).name() + "\",");
+                    result.append("\"subTaskSize\": \"" + 
job.getTasks().size() + "\",");
+
+                    result.append("\"subTasks\":[");
+                    job.getTasks().forEach(item -> {
+                        result.append("{");
+                        result.append("\"jobId\":\"" + item.getId() + "\",");
+                        result.append("\"jobName\":\"" + item.getName() + 
"\",");
+                        result.append("\"status\":\"" + 
parseToJobStatus(item.getStatus()).name() + "\"");
+                        result.append("},");
+                    });
+                    String resultStr = result.substring(0, result.length() - 
1);
+                    resultStr += "]}";
+
+                    kafkaMsgProducer.sendJobStatusMessage(resultStr);
+                }
+            }
         } catch (PersistentException e) {
             logger.error("error change job:" + jobId + " to " + newStatus);
             throw new RuntimeException(e);
         }
     }
 
+    private JobStatusEnum parseToJobStatus(ExecutableState state) {
+        switch (state) {
+            case READY:
+                return JobStatusEnum.PENDING;
+            case RUNNING:
+                return JobStatusEnum.RUNNING;
+            case ERROR:
+                return JobStatusEnum.ERROR;
+            case DISCARDED:
+                return JobStatusEnum.DISCARDED;
+            case SUCCEED:
+                return JobStatusEnum.FINISHED;
+            case STOPPED:
+                return JobStatusEnum.STOPPED;
+            default:
+                throw new RuntimeException("invalid state:" + state);
+        }
+    }
+
     public void reloadAll() throws IOException {
         executableDao.reloadAll();
     }

Reply via email to