This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f6e26d11b623b93a6de7b2ccb35f90b0e66f91e1
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Wed May 27 14:18:58 2020 +0800

    KYLIN-4528 Bad hbase region & irregular query detection
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  44 ++++--
 .../java/org/apache/kylin/common/QueryContext.java |  24 +++-
 .../kylin/common/util/MailTemplateProvider.java    |   5 +
 .../kylin/rest/service/MigrationService.java       |   4 +-
 .../kylin/rest/util/MailNotificationUtil.java      |   6 -
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        |  67 +++++++++
 .../storage/hbase/cube/v2/HBaseRPCHealthCheck.java | 152 +++++++++++++++++++++
 .../hbase/cube/v2/HBaseRPCHealthCheckTest.java     |  63 +++++++++
 8 files changed, 342 insertions(+), 23 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f2d423d..93cdcf9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -18,20 +18,6 @@
 
 package org.apache.kylin.common;
 
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.text.StrSubstitutor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.lock.DistributedLockFactory;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -49,6 +35,20 @@ import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.lock.DistributedLockFactory;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An abstract class to encapsulate access to a set of 'properties'.
  * Subclass can override methods in this class to extend the content of the 
'properties',
@@ -1497,6 +1497,22 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.storage.hbase.max-hconnection-threads-per-project",
 "800"));
     }
 
+    public String[] getHBaseAdminDls() {
+        return 
getOptionalStringArray("kylin.storage.hbase.notification-admin-emails", null);
+    }
+
+    public boolean isHBaseBadRegionDetectEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.storage.hbase.bad-region-detect-enabled",
 "true"));
+    }
+
+    public int getHBaseBadRegionDetectThreshold() {
+        return 
Integer.parseInt(getOptional("kylin.storage.hbase.bad-region-detect-threshold", 
"15"));
+    }
+
+    public int getHBaseBadRegionDetectMultiplier() {
+        return 
Integer.parseInt(getOptional("kylin.storage.hbase.bad-region-detect-multiplier",
 "10"));
+    }
+
     // 
============================================================================
     // ENGINE.MR
     // 
============================================================================
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index df2fa4d..8fed1f5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -82,6 +82,8 @@ public class QueryContext {
     private List<RPCStatistics> rpcStatisticsList = 
Lists.newCopyOnWriteArrayList();
     private Map<Integer, CubeSegmentStatisticsResult> 
cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
 
+    private ConcurrentMap<String, Boolean> isAlreadyAlert = 
Maps.newConcurrentMap();
+
     private ExecutorService connPool;
 
     QueryContext(String projectName, String sql, String user, int 
maxConnThreads) {
@@ -127,10 +129,18 @@ public class QueryContext {
         }
     }
 
+    public boolean isAlreadyAlert() {
+        return isAlreadyAlert.putIfAbsent(queryId, true) != null;
+    }
+
     public String getQueryId() {
         return queryId == null ? "" : queryId;
     }
 
+    public String getSql() {
+        return sql;
+    }
+    
     public long getAccumulatedMillis() {
         return System.currentTimeMillis() - queryStartMillis;
     }
@@ -142,7 +152,7 @@ public class QueryContext {
     public String getProject() {
         return project;
     }
-
+    
     public Set<String> getGroups() {
         return groups;
     }
@@ -232,6 +242,18 @@ public class QueryContext {
         return tracer.activeSpan();
     }
 
+    public String getSpanTagValue(Span span, String tagName) {
+        return String.valueOf(tracer.getTagValue(span, tagName));
+    }
+
+    public long getSpanStart(Span span) {
+        return tracer.getStart(span);
+    }
+
+    public long getSpanDuration(Span span) {
+        return tracer.getDuration(span);
+    }
+    
     public Span startFetchCache() {
         return tracer.startSpan(OperationEum.FETCH_CACHE_STEP, rootSpan);
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
index d403fbc..5c10cef 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/MailTemplateProvider.java
@@ -23,6 +23,7 @@ import java.io.Writer;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.shaded.com.google.common.base.Joiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +45,10 @@ public class MailTemplateProvider {
         return DEFAULT_INSTANCE;
     }
 
+    public static String getMailTitle(String... titleParts) {
+        return "[" + Joiner.on("]-[").join(titleParts) + "]";
+    }
+
     private final Configuration configuration;
 
     private MailTemplateProvider() {
diff --git 
a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
 
b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
index 67fce7e..6864882 100644
--- 
a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
+++ 
b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
@@ -210,10 +210,10 @@ public class MigrationService extends BasicService {
 
         // No project name for rejected title
         if (state == MailNotificationUtil.MIGRATION_REJECTED) {
-            title = MailNotificationUtil.getMailTitle("MIGRATION", 
root.get("status"), root.get("envname"),
+            title = MailTemplateProvider.getMailTitle("MIGRATION", 
root.get("status"), root.get("envname"),
                     root.get("cubename"));
         } else {
-            title = MailNotificationUtil.getMailTitle("MIGRATION", 
root.get("status"), root.get("envname"),
+            title = MailTemplateProvider.getMailTitle("MIGRATION", 
root.get("status"), root.get("envname"),
                     root.get("projectname"), root.get("cubename"));
         }
 
diff --git 
a/cube-migration/src/main/java/org/apache/kylin/rest/util/MailNotificationUtil.java
 
b/cube-migration/src/main/java/org/apache/kylin/rest/util/MailNotificationUtil.java
index 6ef4e28..44d90ed 100644
--- 
a/cube-migration/src/main/java/org/apache/kylin/rest/util/MailNotificationUtil.java
+++ 
b/cube-migration/src/main/java/org/apache/kylin/rest/util/MailNotificationUtil.java
@@ -18,16 +18,10 @@
 
 package org.apache.kylin.rest.util;
 
-import com.google.common.base.Joiner;
-
 public class MailNotificationUtil {
     public static final String MIGRATION_REQUEST = "MIGRATION_REQUEST";
     public static final String MIGRATION_REJECTED = "MIGRATION_REJECTED";
     public static final String MIGRATION_APPROVED = "MIGRATION_APPROVED";
     public static final String MIGRATION_COMPLETED = "MIGRATION_COMPLETED";
     public static final String MIGRATION_FAILED = "MIGRATION_FAILED";
-
-    public static String getMailTitle(String... titleParts) {
-        return "[" + Joiner.on("]-[").join(titleParts) + "]";
-    }
 }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 04fd26c..a810351 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
 
 import org.apache.commons.lang3.SerializationUtils;
@@ -49,6 +51,7 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LoggableCachedThreadPool;
 import org.apache.kylin.common.util.Pair;
@@ -67,6 +70,7 @@ import 
org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
 import org.apache.kylin.shaded.com.google.common.cache.CacheLoader;
 import org.apache.kylin.shaded.com.google.common.cache.LoadingCache;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
@@ -316,6 +320,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             final Table table = 
conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
                     queryContext.getConnectionPool(projThreadPool));
 
+            final AtomicLong fastest = new AtomicLong(3600 * 1000);
+            final long rpcRequestSubmitTime = System.currentTimeMillis();
+            final ConcurrentMap<CubeVisitResponse, Span> regionRPCSpanMap = 
Maps.newConcurrentMap();
             table.coprocessorService(CubeVisitService.class, startKey, endKey, 
//
                     new Batch.Call<CubeVisitService, CubeVisitResponse>() {
                         public CubeVisitResponse call(CubeVisitService 
rowsService) throws IOException {
@@ -379,6 +386,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                     
regionRPCSpan.setTag(TagEnum.RPC_FREE_SWAP.toString(),
                                             
String.valueOf(stats.getFreeSwapSpaceSize()));
                                     
regionRPCSpan.setTag(TagEnum.RPC_ETC_MSG.toString(), stats.getEtcMsg());
+                                    regionRPCSpanMap.put(response, 
regionRPCSpan);
                                 }
                                 regionRPCSpan.finish();
                                 // Reset the interrupted state
@@ -463,10 +471,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                 }
                             }
 
+                            Span regionRPCSpan = regionRPCSpanMap.get(result);
                             if (rpcException != null) {
                                 queryContext.stop(rpcException);
+                                alertException(epRangeSpan, regionRPCSpan, 
rpcException);
                                 return;
                             }
+                            alertSlowRegion(epRangeSpan, regionRPCSpan, stats, 
rpcRequestSubmitTime, fastest);
 
                             epResultItr.append(queueData);
                             // put segment query result to cache if cache is 
enabled
@@ -657,4 +668,60 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             col.exportData(out);
         }
     }
+
+    private void alertException(final Span epRangeSpan, final Span 
regionRPCSpan, Exception rpcException) {
+        if (!queryContext.isAlreadyAlert() || !(rpcException instanceof 
ResourceLimitExceededException)) {
+            int alertRecipient;
+            if (rpcException instanceof ResourceLimitExceededException) {
+                alertRecipient = HBaseRPCHealthCheck.KYLIN_USER;
+            } else if (rpcException instanceof KylinTimeoutException) {
+                alertRecipient = HBaseRPCHealthCheck.KYLIN_HBASE;
+            } else {
+                alertRecipient = HBaseRPCHealthCheck.KYLIN_ADMIN;
+            }
+            HBaseRPCHealthCheck.alert(queryContext, epRangeSpan, 
regionRPCSpan, rpcException, alertRecipient);
+        }
+    }
+
+    private void alertSlowRegion(final Span epRangeSpan, final Span 
regionRPCSpan, final Stats stats,
+            long rpcRequestSubmitTime, AtomicLong fastest) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        long rpcStartTime = queryContext.getSpanStart(regionRPCSpan);
+        long rpcDuration = queryContext.getSpanDuration(regionRPCSpan);
+
+        logger.info("Query-{}: regionRPCSpan spend time {}.", 
queryContext.getQueryId(), rpcDuration);
+        long fastTime = fastest.get();
+        while (rpcDuration < fastTime) {
+            if (fastest.compareAndSet(fastTime, rpcDuration)) {
+                break;
+            }
+            fastTime = fastest.get();
+        }
+        if (rpcDuration > 1000 * config.getHBaseBadRegionDetectThreshold()) {
+            if (rpcDuration > config.getHBaseBadRegionDetectMultiplier() * 
fastest.get()
+                    || rpcDuration > config.getHBaseBadRegionDetectMultiplier()
+                            * (stats.getServiceEndTime() - 
stats.getServiceStartTime())) {
+                if (queryContext.isAlreadyAlert()) {
+                    return;
+                }
+                StringBuilder alerts = new StringBuilder();
+                alerts.append("Slow region detected at: ")
+                        
.append(DateFormat.formatToTimeWithoutMilliStr(System.currentTimeMillis())).append(".
 \n");
+                alerts.append("Total scanned row: 
").append(stats.getScannedRowCount()).append(". \n");
+                alerts.append("Total filtered row: 
").append(stats.getFilteredRowCount()).append(". \n");
+                alerts.append("Total aggregated row: 
").append(stats.getAggregatedRowCount()).append(". \n");
+                alerts.append("RPC task start latency: ").append(rpcStartTime 
- rpcRequestSubmitTime).append(". \n");
+                alerts.append("RPC client call start time: 
").append(rpcStartTime).append(". \n");
+                alerts.append("RPC client call end time: 
").append((rpcStartTime + rpcDuration)).append(". \n");
+                alerts.append("Server CPU usage: 
").append(stats.getSystemCpuLoad()).append(". \n");
+                alerts.append("Server physical mem left: 
").append(stats.getFreePhysicalMemorySize()).append(". \n");
+                alerts.append("Server swap mem left: 
").append(stats.getFreeSwapSpaceSize()).append(". \n");
+                alerts.append("Etc message: 
").append(stats.getEtcMsg()).append(". \n");
+                alerts.append("Normal Complete: 
").append(stats.getNormalComplete() == 1).append(". \n");
+                HBaseRPCHealthCheck.alert(queryContext, epRangeSpan, 
regionRPCSpan, alerts.toString(),
+                        HBaseRPCHealthCheck.KYLIN_HBASE);
+            }
+        }
+    }
 }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheck.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheck.java
new file mode 100755
index 0000000..f9ef503
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheck.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.tracer.TracerConstants.TagEnum;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.MailTemplateProvider;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ToolUtil;
+import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.opentracing.Span;
+
+public class HBaseRPCHealthCheck {
+    public static final Logger logger = 
LoggerFactory.getLogger(HBaseRPCHealthCheck.class);
+
+    public static final String QUERY_ALERT = "QUERY_ALERT";
+
+    public static final int KYLIN_ADMIN = 1;
+    public static final int KYLIN_USER = 2;
+    public static final int KYLIN_HBASE = 3;
+
+    public static final String NA = "NA";
+
+    public static void alert(QueryContext queryContext, Span epRangeSpan, Span 
regionRPCSpan, String reason,
+            int recipient) {
+        alertEmail(queryContext, epRangeSpan, regionRPCSpan, reason, "WARN", 
recipient);
+    }
+
+    public static void alert(QueryContext queryContext, Span epRangeSpan, Span 
regionRPCSpan, Exception exception,
+            int recipient) {
+        StringWriter out = new StringWriter();
+        exception.printStackTrace(new PrintWriter(out));
+        if (exception instanceof ResourceLimitExceededException) {
+            alertEmail(queryContext, epRangeSpan, regionRPCSpan, 
out.toString(), "WARN", recipient);
+        } else {
+            alertEmail(queryContext, epRangeSpan, regionRPCSpan, 
out.toString(), "ERROR", recipient);
+        }
+    }
+
+    private static void alertEmail(QueryContext queryContext, Span 
epRangeSpan, Span regionRPCSpan, String alertReason,
+            String state, int recipient) {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        if (!kylinConfig.isHBaseBadRegionDetectEnabled()) {
+            return;
+        }
+
+        try {
+            Set<String> users = Sets.newHashSet();
+            final String[] kylinAdminDls = kylinConfig.getAdminDls();
+            if (kylinAdminDls != null) {
+                users.addAll(Sets.newHashSet(kylinAdminDls));
+            }
+
+            if (recipient == KYLIN_USER) {
+                users.add(queryContext.getUsername());
+            } else if (recipient == KYLIN_HBASE) {
+                final String[] hbaseAdminDls = kylinConfig.getHBaseAdminDls();
+                if (hbaseAdminDls != null) {
+                    users.addAll(Sets.newHashSet(hbaseAdminDls));
+                }
+            }
+
+            if (users.isEmpty()) {
+                logger.warn("no need to send email, user list is empty");
+                return;
+            }
+            final Pair<String, String> email = 
formatNotifications(queryContext, epRangeSpan, regionRPCSpan,
+                    alertReason, state);
+            if (email == null) {
+                logger.warn("no need to send email, content is null");
+                return;
+            }
+            new MailService(kylinConfig).sendMail(Lists.newArrayList(users), 
email.getFirst(), email.getSecond());
+        } catch (Exception e) {
+            logger.error("error send email", e);
+        }
+    }
+
+    @VisibleForTesting
+    static Pair<String, String> formatNotifications(QueryContext queryContext, 
Span epRangeSpan, Span regionRPCSpan,
+            String alertReason, String state) {
+        Map<String, Object> root = Maps.newHashMap();
+        String cube = queryContext.getSpanTagValue(epRangeSpan, 
TagEnum.CUBE.toString());
+        root.put("env_name", KylinConfig.getInstanceFromEnv().getDeployEnv());
+        root.put("submitter", StringUtil.noBlank(queryContext.getUsername(), 
"UNKNOWN"));
+        root.put("query_engine", ToolUtil.getHostName());
+        root.put("project_name", queryContext.getProject());
+        root.put("sql", queryContext.getSql());
+        root.put("queryId", queryContext.getQueryId());
+        root.put("cube_name", cube);
+        if (queryContext.getSpanTagValue(epRangeSpan, 
TagEnum.SEGMENT.toString()) != null) {
+            root.put("segment_name", queryContext.getSpanTagValue(epRangeSpan, 
TagEnum.SEGMENT.toString()));
+        } else {
+            root.put("segment_name", NA);
+        }
+        if (queryContext.getSpanTagValue(epRangeSpan, 
TagEnum.HTABLE.toString()) != null) {
+            root.put("htable", queryContext.getSpanTagValue(epRangeSpan, 
TagEnum.HTABLE.toString()));
+        } else {
+            root.put("htable", NA);
+        }
+        if (queryContext.getSpanTagValue(regionRPCSpan, 
TagEnum.REGION_SERVER.toString()) != null) {
+            root.put("region_server", 
queryContext.getSpanTagValue(regionRPCSpan, TagEnum.REGION_SERVER.toString()));
+        } else {
+            root.put("region_server", NA);
+        }
+        root.put("rpc_duration", queryContext.getSpanDuration(regionRPCSpan) + 
"(ms)");
+        if (queryContext.getSpanTagValue(regionRPCSpan, 
TagEnum.RPC_DURATION.toString()) != null) {
+            root.put("kylin_ep_duration", 
queryContext.getSpanTagValue(regionRPCSpan, TagEnum.RPC_DURATION.toString()));
+        } else {
+            root.put("kylin_ep_duration", NA);
+        }
+
+        root.put("alert_reason", 
Matcher.quoteReplacement(StringUtil.noBlank(alertReason, "no error message")));
+
+        String content = 
MailTemplateProvider.getInstance().buildMailContent(QUERY_ALERT, root);
+        String title = MailTemplateProvider.getMailTitle("QUERY ALERT", state,
+                KylinConfig.getInstanceFromEnv().getDeployEnv(), 
queryContext.getProject(), cube);
+        return Pair.newPair(title, content);
+    }
+}
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheckTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheckTest.java
new file mode 100755
index 0000000..47fa157
--- /dev/null
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/HBaseRPCHealthCheckTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.opentracing.Span;
+
+public class HBaseRPCHealthCheckTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testFormatEmail() {
+        QueryContext queryContext = 
QueryContextFacade.startQuery("testProject", "select count(*) from testTable",
+                "unittest");
+        Span epRangeSpan = queryContext.startEPRangeQuerySpan("testRange", 
"testCube", "testSegment", "testHTable", 111,
+                11111, "no");
+        Span regionRPCSpan = 
queryContext.startRegionRPCSpan("testRegionServer", epRangeSpan);
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        regionRPCSpan.finish();
+        epRangeSpan.finish();
+        Pair<String, String> result = 
HBaseRPCHealthCheck.formatNotifications(queryContext, epRangeSpan, 
regionRPCSpan,
+                "slow region detected.", "WARN");
+        Assert.assertEquals(result.getFirst(), "[QUERY 
ALERT]-[WARN]-[DEV]-[testProject]-[testCube]");
+        System.out.println(result.getSecond());
+    }
+}

Reply via email to