This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 8e98b8b8807929cb7bbdf8f52bbb98383c254a33
Author: shaofengshi <shaofeng...@apache.org>
AuthorDate: Sun Sep 30 08:55:02 2018 +0800

    KYLIN-3560 Should not depend on personal repository
---
 LICENSE                                            |   7 +-
 core-common/pom.xml                                |  12 +-
 .../metrics/metrics2/HadoopMetrics2Reporter.java   | 495 +++++++++++++++++++++
 .../common/metrics/metrics2/Metrics2Reporter.java  |   1 -
 .../metrics2/HadoopMetrics2ReporterTest.java       | 415 +++++++++++++++++
 .../common/metrics/metrics2/StandaloneExample.java | 114 +++++
 6 files changed, 1036 insertions(+), 8 deletions(-)

diff --git a/LICENSE b/LICENSE
index a237c61..26e015c 100644
--- a/LICENSE
+++ b/LICENSE
@@ -265,5 +265,8 @@ RocksDB is dual-licensed under both the GPLv2 and Apache 
2.0 License.You may
 select, at your option, one of the above-listed licenses.
 https://github.com/facebook/rocksdb/blob/master/LICENSE.Apache
 
-
-
+==============================================================================
+For HadoopMetrics2Reporter:
+==============================================================================
+HadoopMetrics2Reporter is licensed under Apache 2.0 License.
+https://github.com/joshelser/dropwizard-hadoop-metrics2
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 3b5f7fc..594e39b 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -87,11 +87,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.github.joshelser</groupId>
-            
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
-            <version>0.1.2</version>
-        </dependency>
-        <dependency>
             <groupId>commons-dbcp</groupId>
             <artifactId>commons-dbcp</artifactId>
             <version>1.4</version>
@@ -101,5 +96,12 @@
             <artifactId>mysql-connector-java</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2Reporter.java
 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2Reporter.java
new file mode 100644
index 0000000..c0e48a6
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2Reporter.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.metrics.metrics2;
+
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+/**
+ * Modified from https://github.com/joshelser/dropwizard-hadoop-metrics2, 
Copyright by Josh Elser
+ *
+ * A {@link com.codahale.metrics.Reporter} which also acts as a Hadoop Metrics2
+ * {@link MetricsSource}. Configure it like other Reporters.
+ *
+ * <pre>
+ * final HadoopMetrics2Reporter metrics2Reporter = 
HadoopMetrics2Reporter.forRegistry(metrics)
+ *     .build(DefaultMetricsSystem.initialize("Phoenix"), // The 
application-level name
+ *            "QueryServer", // Component name
+ *            "Phoenix Query Server", // Component description
+ *            "General"); // Name for each metric record
+ *
+ * metrics2Reporter.start(30, TimeUnit.SECONDS);
+ * </pre>
+ */
+public class HadoopMetrics2Reporter extends ScheduledReporter implements 
MetricsSource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopMetrics2Reporter.class);
+    private static final String EMPTY_STRING = "";
+    @SuppressWarnings("rawtypes")
+    private static final SortedMap<String, Gauge> EMPTY_GAUGE_MAP = Collections
+            .unmodifiableSortedMap(new TreeMap<String, Gauge>());
+    private static final SortedMap<String, Meter> EMPTY_METER_MAP = Collections
+            .unmodifiableSortedMap(new TreeMap<String, Meter>());
+    private static final SortedMap<String, Timer> EMPTY_TIMER_MAP = Collections
+            .unmodifiableSortedMap(new TreeMap<String, Timer>());
+    private static final SortedMap<String, Counter> EMPTY_COUNTER_MAP = 
Collections
+            .unmodifiableSortedMap(new TreeMap<String, Counter>());
+    private static final SortedMap<String, Histogram> EMPTY_HISTOGRAM_MAP = 
Collections
+            .unmodifiableSortedMap(new TreeMap<String, Histogram>());
+
+    public static final MetricsInfo RATE_UNIT_LABEL = 
Interns.info("rate_unit", "The unit of measure for rate metrics");
+    public static final MetricsInfo DURATION_UNIT_LABEL = 
Interns.info("duration_unit",
+            "The unit of measure of duration metrics");
+
+    /**
+     * Returns a new {@link Builder} for {@link HadoopMetrics2Reporter}.
+     *
+     * @param registry the registry to report
+     * @return a {@link Builder} instance for a {@link HadoopMetrics2Reporter}
+     */
+    public static Builder forRegistry(MetricRegistry registry) {
+        return new Builder(registry);
+    }
+
+    /**
+     * A builder to create {@link HadoopMetrics2Reporter} instances.
+     */
+    public static class Builder {
+        private final MetricRegistry registry;
+        private MetricFilter filter;
+        private TimeUnit rateUnit;
+        private TimeUnit durationUnit;
+        private String recordContext;
+
+        private Builder(MetricRegistry registry) {
+            this.registry = registry;
+            this.filter = MetricFilter.ALL;
+            this.rateUnit = TimeUnit.SECONDS;
+            this.durationUnit = TimeUnit.MILLISECONDS;
+        }
+
+        /**
+         * Convert rates to the given time unit. Defaults to {@link 
TimeUnit#SECONDS}.
+         *
+         * @param rateUnit a unit of time
+         * @return {@code this}
+         */
+        public Builder convertRatesTo(TimeUnit rateUnit) {
+            this.rateUnit = Objects.requireNonNull(rateUnit);
+            return this;
+        }
+
+        /**
+         * Convert durations to the given time unit. Defaults to {@link 
TimeUnit#MILLISECONDS}.
+         *
+         * @param durationUnit a unit of time
+         * @return {@code this}
+         */
+        public Builder convertDurationsTo(TimeUnit durationUnit) {
+            this.durationUnit = Objects.requireNonNull(durationUnit);
+            return this;
+        }
+
+        /**
+         * Only report metrics which match the given filter. Defaults to 
{@link MetricFilter#ALL}.
+         *
+         * @param filter a {@link MetricFilter}
+         * @return {@code this}
+         */
+        public Builder filter(MetricFilter filter) {
+            this.filter = Objects.requireNonNull(filter);
+            return this;
+        }
+
+        /**
+         * A "context" name that will be added as a tag on each emitted metric 
record. Defaults to
+         * no "context" attribute on each record.
+         *
+         * @param recordContext The "context" tag
+         * @return {@code this}
+         */
+        public Builder recordContext(String recordContext) {
+            this.recordContext = Objects.requireNonNull(recordContext);
+            return this;
+        }
+
+        /**
+         * Builds a {@link HadoopMetrics2Reporter} with the given properties, 
making metrics available
+         * to the Hadoop Metrics2 framework (any configured {@link 
MetricsSource}s.
+         *
+         * @param metrics2System The Hadoop Metrics2 system instance.
+         * @param jmxContext The JMX "path", e.g. {@code 
"MyServer,sub=Requests"}.
+         * @param description A description these metrics.
+         * @param recordName A suffix included on each record to identify it.
+         *
+         * @return a {@link HadoopMetrics2Reporter}
+         */
+        public HadoopMetrics2Reporter build(MetricsSystem metrics2System, 
String jmxContext, String description,
+                String recordName) {
+            return new HadoopMetrics2Reporter(registry, rateUnit, 
durationUnit, filter, metrics2System,
+                    Objects.requireNonNull(jmxContext), description, 
recordName, recordContext);
+        }
+    }
+
+    private final MetricsRegistry metrics2Registry;
+    private final MetricsSystem metrics2System;
+    private final String recordName;
+    private final String context;
+
+    // TODO Adding to the queues and removing from them are now guarded by 
explicit synchronization
+    // so these don't need to be safe for concurrency anymore.
+    @SuppressWarnings("rawtypes")
+    private SortedMap<String, Gauge> dropwizardGauges;
+    private SortedMap<String, Counter> dropwizardCounters;
+    private SortedMap<String, Histogram> dropwizardHistograms;
+    private SortedMap<String, Meter> dropwizardMeters;
+    private SortedMap<String, Timer> dropwizardTimers;
+
+    private HadoopMetrics2Reporter(MetricRegistry registry, TimeUnit rateUnit, 
TimeUnit durationUnit,
+            MetricFilter filter, MetricsSystem metrics2System, String 
jmxContext, String description, String recordName,
+            String context) {
+        super(registry, "hadoop-metrics2-reporter", filter, rateUnit, 
durationUnit);
+        this.metrics2Registry = new MetricsRegistry(Interns.info(jmxContext, 
description));
+        this.metrics2System = metrics2System;
+        this.recordName = recordName;
+        this.context = context;
+
+        // These could really be Collection.emptyMap(), but this makes testing 
a bit easier.
+        this.dropwizardGauges = EMPTY_GAUGE_MAP;
+        this.dropwizardCounters = EMPTY_COUNTER_MAP;
+        this.dropwizardHistograms = EMPTY_HISTOGRAM_MAP;
+        this.dropwizardMeters = EMPTY_METER_MAP;
+        this.dropwizardTimers = EMPTY_TIMER_MAP;
+
+        // Register this source with the Metrics2 system.
+        // Make sure this is the last thing done as getMetrics() can be called 
at any time after.
+        this.metrics2System.register(Objects.requireNonNull(jmxContext), 
Objects.requireNonNull(description), this);
+    }
+
+    @Override
+    public void getMetrics(MetricsCollector collector, boolean all) {
+        MetricsRecordBuilder builder = collector.addRecord(recordName);
+        if (null != context) {
+            builder.setContext(context);
+        }
+
+        // Synchronizing here ensures that the dropwizard metrics collection 
side is excluded from executing
+        // at the same time we are pulling elements from the queues.
+        synchronized (this) {
+            snapshotAllMetrics(builder);
+        }
+
+        metrics2Registry.snapshot(builder, all);
+    }
+
+    /**
+     * Consumes the current metrics collected by dropwizard and adds them to 
the {@code builder}.
+     *
+     * @param builder A record builder
+     */
+    void snapshotAllMetrics(MetricsRecordBuilder builder) {
+        try {
+            // Pass through the gauges
+            @SuppressWarnings("rawtypes")
+            Iterator<Entry<String, Gauge>> gaugeIterator = 
dropwizardGauges.entrySet().iterator();
+            while (gaugeIterator.hasNext()) {
+                @SuppressWarnings("rawtypes")
+                Entry<String, Gauge> gauge = gaugeIterator.next();
+                final MetricsInfo info = Interns.info(gauge.getKey(), 
EMPTY_STRING);
+                final Object o = gauge.getValue().getValue();
+
+                // Figure out which gauge types metrics2 supports and call the 
right method
+                if (o instanceof Integer) {
+                    builder.addGauge(info, (int) o);
+                } else if (o instanceof Long) {
+                    builder.addGauge(info, (long) o);
+                } else if (o instanceof Float) {
+                    builder.addGauge(info, (float) o);
+                } else if (o instanceof Double) {
+                    builder.addGauge(info, (double) o);
+                } else {
+                    LOG.trace("Ignoring Gauge ({}) with unhandled type: {}", 
gauge.getKey(), o.getClass());
+                }
+            }
+        } finally {
+            dropwizardGauges = EMPTY_GAUGE_MAP;
+        }
+
+        try {
+            // Pass through the counters
+            Iterator<Entry<String, Counter>> counterIterator = 
dropwizardCounters.entrySet().iterator();
+            while (counterIterator.hasNext()) {
+                Entry<String, Counter> counter = counterIterator.next();
+                MetricsInfo info = Interns.info(counter.getKey(), 
EMPTY_STRING);
+                builder.addCounter(info, counter.getValue().getCount());
+            }
+        } finally {
+            dropwizardCounters = EMPTY_COUNTER_MAP;
+        }
+
+        try {
+            // Pass through the histograms
+            Iterator<Entry<String, Histogram>> histogramIterator = 
dropwizardHistograms.entrySet().iterator();
+            while (histogramIterator.hasNext()) {
+                final Entry<String, Histogram> entry = 
histogramIterator.next();
+                final String name = entry.getKey();
+                final Histogram histogram = entry.getValue();
+
+                addSnapshot(builder, name, EMPTY_STRING, 
histogram.getSnapshot(), histogram.getCount());
+            }
+        } finally {
+            dropwizardHistograms = EMPTY_HISTOGRAM_MAP;
+        }
+
+        try {
+            // Pass through the meter values
+            Iterator<Entry<String, Meter>> meterIterator = 
dropwizardMeters.entrySet().iterator();
+            while (meterIterator.hasNext()) {
+                final Entry<String, Meter> meterEntry = meterIterator.next();
+                final String name = meterEntry.getKey();
+                final Meter meter = meterEntry.getValue();
+
+                addMeter(builder, name, EMPTY_STRING, meter.getCount(), 
meter.getMeanRate(), meter.getOneMinuteRate(),
+                        meter.getFiveMinuteRate(), 
meter.getFifteenMinuteRate());
+            }
+        } finally {
+            dropwizardMeters = EMPTY_METER_MAP;
+        }
+
+        try {
+            // Pass through the timers (meter + histogram)
+            Iterator<Entry<String, Timer>> timerIterator = 
dropwizardTimers.entrySet().iterator();
+            while (timerIterator.hasNext()) {
+                final Entry<String, Timer> timerEntry = timerIterator.next();
+                final String name = timerEntry.getKey();
+                final Timer timer = timerEntry.getValue();
+                final Snapshot snapshot = timer.getSnapshot();
+
+                // Add the meter info (mean rate and rate over time windows)
+                addMeter(builder, name, EMPTY_STRING, timer.getCount(), 
timer.getMeanRate(), timer.getOneMinuteRate(),
+                        timer.getFiveMinuteRate(), 
timer.getFifteenMinuteRate());
+
+                // Count was already added via the meter
+                addSnapshot(builder, name, EMPTY_STRING, snapshot);
+            }
+        } finally {
+            dropwizardTimers = EMPTY_TIMER_MAP;
+        }
+
+        // Add in metadata about what the units the reported metrics are 
displayed using.
+        builder.tag(RATE_UNIT_LABEL, getRateUnit());
+        builder.tag(DURATION_UNIT_LABEL, getDurationUnit());
+    }
+
+    /**
+     * Add Dropwizard-Metrics rate information to a Hadoop-Metrics2 record 
builder, converting the
+     * rates to the appropriate unit.
+     *
+     * @param builder A Hadoop-Metrics2 record builder.
+     * @param name A base name for this record.
+     * @param desc A description for the record.
+     * @param count The number of measured events.
+     * @param meanRate The average measured rate.
+     * @param oneMinuteRate The measured rate over the past minute.
+     * @param fiveMinuteRate The measured rate over the past five minutes
+     * @param fifteenMinuteRate The measured rate over the past fifteen 
minutes.
+     */
+    private void addMeter(MetricsRecordBuilder builder, String name, String 
desc, long count, double meanRate,
+            double oneMinuteRate, double fiveMinuteRate, double 
fifteenMinuteRate) {
+        builder.addGauge(Interns.info(name + "_count", EMPTY_STRING), count);
+        builder.addGauge(Interns.info(name + "_mean_rate", EMPTY_STRING), 
convertRate(meanRate));
+        builder.addGauge(Interns.info(name + "_1min_rate", EMPTY_STRING), 
convertRate(oneMinuteRate));
+        builder.addGauge(Interns.info(name + "_5min_rate", EMPTY_STRING), 
convertRate(fiveMinuteRate));
+        builder.addGauge(Interns.info(name + "_15min_rate", EMPTY_STRING), 
convertRate(fifteenMinuteRate));
+    }
+
+    /**
+     * Add Dropwizard-Metrics value-distribution data to a Hadoop-Metrics2 
record building, converting
+     * the durations to the appropriate unit.
+     *
+     * @param builder A Hadoop-Metrics2 record builder.
+     * @param name A base name for this record.
+     * @param desc A description for this record.
+     * @param snapshot The distribution of measured values.
+     * @param count The number of values which were measured.
+     */
+    private void addSnapshot(MetricsRecordBuilder builder, String name, String 
desc, Snapshot snapshot, long count) {
+        builder.addGauge(Interns.info(name + "_count", desc), count);
+        addSnapshot(builder, name, desc, snapshot);
+    }
+
+    /**
+     * Add Dropwizard-Metrics value-distribution data to a Hadoop-Metrics2 
record building, converting
+     * the durations to the appropriate unit.
+     *
+     * @param builder A Hadoop-Metrics2 record builder.
+     * @param name A base name for this record.
+     * @param desc A description for this record.
+     * @param snapshot The distribution of measured values.
+     */
+    private void addSnapshot(MetricsRecordBuilder builder, String name, String 
desc, Snapshot snapshot) {
+        builder.addGauge(Interns.info(name + "_mean", desc), 
convertDuration(snapshot.getMean()));
+        builder.addGauge(Interns.info(name + "_min", desc), 
convertDuration(snapshot.getMin()));
+        builder.addGauge(Interns.info(name + "_max", desc), 
convertDuration(snapshot.getMax()));
+        builder.addGauge(Interns.info(name + "_median", desc), 
convertDuration(snapshot.getMedian()));
+        builder.addGauge(Interns.info(name + "_stddev", desc), 
convertDuration(snapshot.getStdDev()));
+
+        builder.addGauge(Interns.info(name + "_75thpercentile", desc), 
convertDuration(snapshot.get75thPercentile()));
+        builder.addGauge(Interns.info(name + "_95thpercentile", desc), 
convertDuration(snapshot.get95thPercentile()));
+        builder.addGauge(Interns.info(name + "_98thpercentile", desc), 
convertDuration(snapshot.get98thPercentile()));
+        builder.addGauge(Interns.info(name + "_99thpercentile", desc), 
convertDuration(snapshot.get99thPercentile()));
+        builder.addGauge(Interns.info(name + "_999thpercentile", desc), 
convertDuration(snapshot.get999thPercentile()));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, 
Counter> counters,
+            SortedMap<String, Histogram> histograms, SortedMap<String, Meter> 
meters, SortedMap<String, Timer> timers) {
+        // ScheduledReporter is synchronizing on `this`, so we don't have to 
worry about concurrent
+        // invocations of reporter causing trouble. All of the maps provided 
to us are umodifiable
+        // copies of the actual metric objects. We can just hold onto them.
+        dropwizardGauges = gauges;
+        dropwizardCounters = counters;
+        dropwizardHistograms = histograms;
+        dropwizardMeters = meters;
+        dropwizardTimers = timers;
+    }
+
+    @Override
+    protected String getRateUnit() {
+        // Make it "events per rate_unit" to be accurate.
+        return "events/" + super.getRateUnit();
+    }
+
+    @Override
+    protected String getDurationUnit() {
+        // Make it visible to the tests
+        return super.getDurationUnit();
+    }
+
+    @Override
+    protected double convertDuration(double duration) {
+        // Make it visible to the tests
+        return super.convertDuration(duration);
+    }
+
+    @Override
+    protected double convertRate(double rate) {
+        // Make it visible to the tests
+        return super.convertRate(rate);
+    }
+
+    // Getters visible for testing
+
+    MetricsRegistry getMetrics2Registry() {
+        return metrics2Registry;
+    }
+
+    MetricsSystem getMetrics2System() {
+        return metrics2System;
+    }
+
+    String getRecordName() {
+        return recordName;
+    }
+
+    String getContext() {
+        return context;
+    }
+
+    @SuppressWarnings("rawtypes")
+    SortedMap<String, Gauge> getDropwizardGauges() {
+        return dropwizardGauges;
+    }
+
+    void setDropwizardGauges(@SuppressWarnings("rawtypes") SortedMap<String, 
Gauge> gauges) {
+        this.dropwizardGauges = Objects.requireNonNull(gauges);
+    }
+
+    SortedMap<String, Counter> getDropwizardCounters() {
+        return dropwizardCounters;
+    }
+
+    void setDropwizardCounters(SortedMap<String, Counter> counters) {
+        this.dropwizardCounters = counters;
+    }
+
+    SortedMap<String, Histogram> getDropwizardHistograms() {
+        return dropwizardHistograms;
+    }
+
+    void setDropwizardHistograms(SortedMap<String, Histogram> histograms) {
+        this.dropwizardHistograms = histograms;
+    }
+
+    SortedMap<String, Meter> getDropwizardMeters() {
+        return dropwizardMeters;
+    }
+
+    void setDropwizardMeters(SortedMap<String, Meter> meters) {
+        this.dropwizardMeters = meters;
+    }
+
+    SortedMap<String, Timer> getDropwizardTimers() {
+        return dropwizardTimers;
+    }
+
+    void setDropwizardTimers(SortedMap<String, Timer> timers) {
+        this.dropwizardTimers = timers;
+    }
+
+    protected void printQueueDebugMessage() {
+        StringBuilder sb = new StringBuilder(64);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss", 
Locale.ROOT);
+        sb.append(sdf.format(new Date())).append(" 
================================\n");
+        sb.append("\n  Dropwizard gauges size: 
").append(getDropwizardGauges().size());
+        sb.append("\n  Dropwizard counters size: 
").append(getDropwizardCounters().size());
+        sb.append("\n  Dropwizard histograms size: 
").append(getDropwizardHistograms().size());
+        sb.append("\n  Dropwizard meters size: 
").append(getDropwizardMeters().size());
+        sb.append("\n  Dropwizard timers size: 
").append(getDropwizardTimers().size()).append("\n");
+        System.out.println(sb.toString());
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/Metrics2Reporter.java
 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/Metrics2Reporter.java
index e1da056..83dab81 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/Metrics2Reporter.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/Metrics2Reporter.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.kylin.common.KylinConfig;
 
 import com.codahale.metrics.MetricRegistry;
-import com.github.joshelser.dropwizard.metrics.hadoop.HadoopMetrics2Reporter;
 
 /**
  * A wrapper around Codahale HadoopMetrics2Reporter to make it a 
pluggable/configurable Kylin Metrics reporter.
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2ReporterTest.java
 
b/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2ReporterTest.java
new file mode 100644
index 0000000..75752bd
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/HadoopMetrics2ReporterTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.metrics.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Modified from https://github.com/joshelser/dropwizard-hadoop-metrics2, 
Copyright by Josh Elser
+ * Tests for {@link HadoopMetrics2Reporter}.
+ */
+public class HadoopMetrics2ReporterTest {
+
+    private MetricRegistry mockRegistry;
+    private MetricsSystem mockMetricsSystem;
+    private String recordName = "myserver";
+    private HadoopMetrics2Reporter metrics2Reporter;
+
+    @Before
+    public void setup() {
+        mockRegistry = mock(MetricRegistry.class);
+        mockMetricsSystem = mock(MetricsSystem.class);
+
+        recordName = "myserver";
+        metrics2Reporter = 
HadoopMetrics2Reporter.forRegistry(mockRegistry).convertDurationsTo(TimeUnit.MILLISECONDS)
+                .convertRatesTo(TimeUnit.SECONDS).build(mockMetricsSystem, 
"MyServer", "My Cool Server", recordName);
+    }
+
+    private void verifyRecordBuilderUnits(MetricsRecordBuilder recordBuilder) {
+        verify(recordBuilder).tag(HadoopMetrics2Reporter.RATE_UNIT_LABEL, 
metrics2Reporter.getRateUnit());
+        verify(recordBuilder).tag(HadoopMetrics2Reporter.DURATION_UNIT_LABEL, 
metrics2Reporter.getDurationUnit());
+    }
+
+    @Test
+    public void testBuilderDefaults() {
+        HadoopMetrics2Reporter.Builder builder = 
HadoopMetrics2Reporter.forRegistry(mockRegistry);
+
+        final String jmxContext = "MyJmxContext;sub=Foo";
+        final String desc = "Description";
+        final String recordName = "Metrics";
+
+        HadoopMetrics2Reporter reporter = builder.build(mockMetricsSystem, 
jmxContext, desc, recordName);
+
+        assertEquals(mockMetricsSystem, reporter.getMetrics2System());
+        // The Context "tag", not the jmx context
+        assertEquals(null, reporter.getContext());
+        assertEquals(recordName, reporter.getRecordName());
+    }
+
+    @Test
+    public void testGaugeReporting() {
+        final AtomicLong gaugeValue = new AtomicLong(0L);
+        @SuppressWarnings("rawtypes")
+        final Gauge gauge = new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return gaugeValue.get();
+            }
+        };
+
+        @SuppressWarnings("rawtypes")
+        TreeMap<String, Gauge> gauges = new TreeMap<>();
+        gauges.put("my_gauge", gauge);
+        // Add the metrics objects to the internal "queues" by hand
+        metrics2Reporter.setDropwizardGauges(gauges);
+
+        // Set some values
+        gaugeValue.set(5L);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        // Make sure a value of 5 gets reported
+        metrics2Reporter.getMetrics(collector, true);
+
+        verify(recordBuilder).addGauge(Interns.info("my_gauge", ""), 
gaugeValue.get());
+        verifyRecordBuilderUnits(recordBuilder);
+
+        // Should not be the same instance we gave before. Our map should have 
gotten swapped out.
+        assertTrue("Should not be the same map instance after collection",
+                gauges != metrics2Reporter.getDropwizardGauges());
+    }
+
+    @Test
+    public void testCounterReporting() {
+        final Counter counter = new Counter();
+
+        TreeMap<String, Counter> counters = new TreeMap<>();
+        counters.put("my_counter", counter);
+        // Add the metrics objects to the internal "queues" by hand
+        metrics2Reporter.setDropwizardCounters(counters);
+
+        // Set some values
+        counter.inc(5L);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        metrics2Reporter.getMetrics(collector, true);
+
+        verify(recordBuilder).addCounter(Interns.info("my_counter", ""), 5L);
+        verifyRecordBuilderUnits(recordBuilder);
+
+        // Should not be the same instance we gave before. Our map should have 
gotten swapped out.
+        assertTrue("Should not be the same map instance after collection",
+                counters != metrics2Reporter.getDropwizardCounters());
+    }
+
+    @Test
+    public void testHistogramReporting() {
+        final String metricName = "my_histogram";
+        final Histogram histogram = mock(Histogram.class);
+        final Snapshot snapshot = mock(Snapshot.class);
+
+        long count = 10L;
+        double percentile75 = 75;
+        double percentile95 = 95;
+        double percentile98 = 98;
+        double percentile99 = 99;
+        double percentile999 = 999;
+        double median = 50;
+        double mean = 60;
+        long min = 1L;
+        long max = 100L;
+        double stddev = 10;
+
+        when(snapshot.get75thPercentile()).thenReturn(percentile75);
+        when(snapshot.get95thPercentile()).thenReturn(percentile95);
+        when(snapshot.get98thPercentile()).thenReturn(percentile98);
+        when(snapshot.get99thPercentile()).thenReturn(percentile99);
+        when(snapshot.get999thPercentile()).thenReturn(percentile999);
+        when(snapshot.getMedian()).thenReturn(median);
+        when(snapshot.getMean()).thenReturn(mean);
+        when(snapshot.getMin()).thenReturn(min);
+        when(snapshot.getMax()).thenReturn(max);
+        when(snapshot.getStdDev()).thenReturn(stddev);
+
+        when(histogram.getCount()).thenReturn(count);
+        when(histogram.getSnapshot()).thenReturn(snapshot);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        TreeMap<String, Histogram> histograms = new TreeMap<>();
+        histograms.put(metricName, histogram);
+        // Add the metrics objects to the internal "queues" by hand
+        metrics2Reporter.setDropwizardHistograms(histograms);
+
+        metrics2Reporter.getMetrics(collector, true);
+
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_max", ""), 
metrics2Reporter.convertDuration(max));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_min", ""), 
metrics2Reporter.convertDuration(min));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_median", 
""),
+                metrics2Reporter.convertDuration(median));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_count", 
""), count);
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_stddev", 
""),
+                metrics2Reporter.convertDuration(stddev));
+
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_75thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile75));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_95thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile95));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_98thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile98));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_99thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile99));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_999thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile999));
+
+        verifyRecordBuilderUnits(recordBuilder);
+
+        // Should not be the same instance we gave before. Our map should have 
gotten swapped out.
+        assertTrue("Should not be the same map instance after collection",
+                histograms != metrics2Reporter.getDropwizardHistograms());
+    }
+
+    @Test
+    public void testTimerReporting() {
+        final String metricName = "my_timer";
+        final Timer timer = mock(Timer.class);
+        final Snapshot snapshot = mock(Snapshot.class);
+
+        TreeMap<String, Timer> timers = new TreeMap<>();
+        timers.put(metricName, timer);
+        // Add the metrics objects to the internal "queues" by hand
+        metrics2Reporter.setDropwizardTimers(timers);
+
+        long count = 10L;
+        double meanRate = 1.0;
+        double oneMinRate = 2.0;
+        double fiveMinRate = 5.0;
+        double fifteenMinRate = 10.0;
+
+        when(timer.getCount()).thenReturn(count);
+        when(timer.getMeanRate()).thenReturn(meanRate);
+        when(timer.getOneMinuteRate()).thenReturn(oneMinRate);
+        when(timer.getFiveMinuteRate()).thenReturn(fiveMinRate);
+        when(timer.getFifteenMinuteRate()).thenReturn(fifteenMinRate);
+        when(timer.getSnapshot()).thenReturn(snapshot);
+
+        double percentile75 = 75;
+        double percentile95 = 95;
+        double percentile98 = 98;
+        double percentile99 = 99;
+        double percentile999 = 999;
+        double median = 50;
+        double mean = 60;
+        long min = 1L;
+        long max = 100L;
+        double stddev = 10;
+
+        when(snapshot.get75thPercentile()).thenReturn(percentile75);
+        when(snapshot.get95thPercentile()).thenReturn(percentile95);
+        when(snapshot.get98thPercentile()).thenReturn(percentile98);
+        when(snapshot.get99thPercentile()).thenReturn(percentile99);
+        when(snapshot.get999thPercentile()).thenReturn(percentile999);
+        when(snapshot.getMedian()).thenReturn(median);
+        when(snapshot.getMean()).thenReturn(mean);
+        when(snapshot.getMin()).thenReturn(min);
+        when(snapshot.getMax()).thenReturn(max);
+        when(snapshot.getStdDev()).thenReturn(stddev);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        metrics2Reporter.getMetrics(collector, true);
+
+        // We get the count from the meter and histogram
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_count", 
""), count);
+
+        // Verify the rates
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_mean_rate", 
""),
+                metrics2Reporter.convertRate(meanRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_1min_rate", 
""),
+                metrics2Reporter.convertRate(oneMinRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_5min_rate", 
""),
+                metrics2Reporter.convertRate(fiveMinRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_15min_rate", ""),
+                metrics2Reporter.convertRate(fifteenMinRate));
+
+        // Verify the histogram
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_max", ""), 
metrics2Reporter.convertDuration(max));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_min", ""), 
metrics2Reporter.convertDuration(min));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_median", 
""),
+                metrics2Reporter.convertDuration(median));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_stddev", 
""),
+                metrics2Reporter.convertDuration(stddev));
+
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_75thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile75));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_95thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile95));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_98thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile98));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_99thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile99));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_999thpercentile", ""),
+                metrics2Reporter.convertDuration(percentile999));
+
+        verifyRecordBuilderUnits(recordBuilder);
+
+        // Should not be the same instance we gave before. Our map should have 
gotten swapped out.
+        assertTrue("Should not be the same map instance after collection",
+                timers != metrics2Reporter.getDropwizardTimers());
+    }
+
+    @Test
+    public void testMeterReporting() {
+        final String metricName = "my_meter";
+        final Meter meter = mock(Meter.class);
+
+        TreeMap<String, Meter> meters = new TreeMap<>();
+        meters.put(metricName, meter);
+        // Add the metrics objects to the internal "queues" by hand
+        metrics2Reporter.setDropwizardMeters(meters);
+
+        // Set some values
+        long count = 10L;
+        double meanRate = 1.0;
+        double oneMinRate = 2.0;
+        double fiveMinRate = 5.0;
+        double fifteenMinRate = 10.0;
+
+        when(meter.getCount()).thenReturn(count);
+        when(meter.getMeanRate()).thenReturn(meanRate);
+        when(meter.getOneMinuteRate()).thenReturn(oneMinRate);
+        when(meter.getFiveMinuteRate()).thenReturn(fiveMinRate);
+        when(meter.getFifteenMinuteRate()).thenReturn(fifteenMinRate);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        metrics2Reporter.getMetrics(collector, true);
+
+        // Verify the rates
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_count", 
""), count);
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_mean_rate", 
""),
+                metrics2Reporter.convertRate(meanRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_1min_rate", 
""),
+                metrics2Reporter.convertRate(oneMinRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + "_5min_rate", 
""),
+                metrics2Reporter.convertRate(fiveMinRate));
+        verify(recordBuilder).addGauge(Interns.info(metricName + 
"_15min_rate", ""),
+                metrics2Reporter.convertRate(fifteenMinRate));
+
+        // Verify the units
+        verifyRecordBuilderUnits(recordBuilder);
+
+        // Should not be the same instance we gave before. Our map should have 
gotten swapped out.
+        assertTrue("Should not be the same map instance after collection",
+                meters != metrics2Reporter.getDropwizardMeters());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void metrics2CycleIsNonDestructive() {
+        
metrics2Reporter.setDropwizardCounters(Collections.unmodifiableSortedMap(new 
TreeMap<String, Counter>()));
+        
metrics2Reporter.setDropwizardGauges(Collections.unmodifiableSortedMap(new 
TreeMap<String, Gauge>()));
+        
metrics2Reporter.setDropwizardHistograms(Collections.unmodifiableSortedMap(new 
TreeMap<String, Histogram>()));
+        
metrics2Reporter.setDropwizardMeters(Collections.unmodifiableSortedMap(new 
TreeMap<String, Meter>()));
+        
metrics2Reporter.setDropwizardTimers(Collections.unmodifiableSortedMap(new 
TreeMap<String, Timer>()));
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        metrics2Reporter.getMetrics(collector, true);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void cachedMetricsAreClearedAfterCycle() {
+        // After we perform a metrics2 reporting cycle, the maps should be 
reset to avoid double-reporting
+        TreeMap<String, Counter> counters = new TreeMap<>();
+        TreeMap<String, Gauge> gauges = new TreeMap<>();
+        TreeMap<String, Histogram> histograms = new TreeMap<>();
+        TreeMap<String, Meter> meters = new TreeMap<>();
+        TreeMap<String, Timer> timers = new TreeMap<>();
+
+        metrics2Reporter.setDropwizardCounters(counters);
+        metrics2Reporter.setDropwizardGauges(gauges);
+        metrics2Reporter.setDropwizardHistograms(histograms);
+        metrics2Reporter.setDropwizardMeters(meters);
+        metrics2Reporter.setDropwizardTimers(timers);
+
+        MetricsCollector collector = mock(MetricsCollector.class);
+        MetricsRecordBuilder recordBuilder = mock(MetricsRecordBuilder.class);
+
+        
Mockito.when(collector.addRecord(recordName)).thenReturn(recordBuilder);
+
+        metrics2Reporter.getMetrics(collector, true);
+
+        assertTrue(counters != metrics2Reporter.getDropwizardCounters());
+        assertEquals(0, metrics2Reporter.getDropwizardCounters().size());
+        assertTrue(gauges != metrics2Reporter.getDropwizardGauges());
+        assertEquals(0, metrics2Reporter.getDropwizardGauges().size());
+        assertTrue(histograms != metrics2Reporter.getDropwizardHistograms());
+        assertEquals(0, metrics2Reporter.getDropwizardHistograms().size());
+        assertTrue(meters != metrics2Reporter.getDropwizardMeters());
+        assertEquals(0, metrics2Reporter.getDropwizardMeters().size());
+        assertTrue(timers != metrics2Reporter.getDropwizardTimers());
+        assertEquals(0, metrics2Reporter.getDropwizardTimers().size());
+    }
+}
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/StandaloneExample.java
 
b/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/StandaloneExample.java
new file mode 100644
index 0000000..fabfdab
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/metrics/metrics2/StandaloneExample.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.metrics.metrics2;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.sink.FileSink;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Modified from https://github.com/joshelser/dropwizard-hadoop-metrics2, 
Copyright by Josh Elser
+ *
+ * A little utility to try to simulate "real-life" scenarios. Doesn't actually 
assert anything yet
+ * so it requires human interaction.
+ */
+public class StandaloneExample {
+
+    public static void main(String[] args) throws Exception {
+        final MetricRegistry metrics = new MetricRegistry();
+
+        final HadoopMetrics2Reporter metrics2Reporter = 
HadoopMetrics2Reporter.forRegistry(metrics).build(
+                DefaultMetricsSystem.initialize("StandaloneTest"), // The 
application-level name
+                "Test", // Component name
+                "Test", // Component description
+                "Test"); // Name for each metric record
+        final ConsoleReporter consoleReporter = 
ConsoleReporter.forRegistry(metrics).build();
+
+        MetricsSystem metrics2 = DefaultMetricsSystem.instance();
+        // Writes to stdout without a filename configuration
+        // Will be invoked every 10seconds by default
+        FileSink sink = new FileSink();
+        metrics2.register("filesink", "filesink", sink);
+        sink.init(new SubsetConfiguration(null, null) {
+            public String getString(String key) {
+                if (key.equals("filename")) {
+                    return null;
+                }
+                return super.getString(key);
+            }
+        });
+
+        // How often should the dropwizard reporter be invoked
+        metrics2Reporter.start(500, TimeUnit.MILLISECONDS);
+        // How often will the dropwziard metrics be logged to the console
+        consoleReporter.start(2, TimeUnit.SECONDS);
+
+        generateMetrics(metrics, 5000, 25, TimeUnit.MILLISECONDS, 
metrics2Reporter, 10);
+    }
+
+    /**
+     * Runs a number of threads which generate metrics.
+     */
+    public static void generateMetrics(final MetricRegistry metrics, final 
long metricsToGenerate, final int period,
+            final TimeUnit periodTimeUnit, HadoopMetrics2Reporter 
metrics2Reporter, int numThreads) throws Exception {
+        final ScheduledExecutorService pool = 
Executors.newScheduledThreadPool(numThreads);
+        final CountDownLatch latch = new CountDownLatch(numThreads);
+
+        for (int i = 0; i < numThreads; i++) {
+            final int id = i;
+            final int halfPeriod = (period / 2);
+            Runnable task = new Runnable() {
+                private long executions = 0;
+                final Random r = new Random();
+
+                @Override
+                public void run() {
+                    if (executions >= metricsToGenerate) {
+                        return;
+                    }
+                    metrics.counter("foo counter thread" + id).inc();
+                    executions++;
+                    if (executions < metricsToGenerate) {
+                        pool.schedule(this, period + r.nextInt(halfPeriod), 
periodTimeUnit);
+                    } else {
+                        latch.countDown();
+                    }
+                }
+            };
+            pool.schedule(task, period, periodTimeUnit);
+        }
+
+        while (!latch.await(2, TimeUnit.SECONDS)) {
+            metrics2Reporter.printQueueDebugMessage();
+        }
+
+        pool.shutdown();
+        pool.awaitTermination(5000, TimeUnit.SECONDS);
+    }
+}

Reply via email to