This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new a70707f KYLIN-4508 Add unit test for core-metrics module & reporters a70707f is described below commit a70707f3b77d005aa54a5779c45eeb1ae6191070 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Fri Jun 19 13:41:09 2020 +0800 KYLIN-4508 Add unit test for core-metrics module & reporters --- .../kylin/metrics/lib/impl/TimedRecordEvent.java | 32 ++++ metrics-reporter-hive/pom.xml | 25 ++++ .../kylin/metrics/lib/impl/hive/HiveProducer.java | 7 +- .../metrics/lib/impl/hive/HiveProducerRecord.java | 141 +++++++----------- .../lib/impl/hive/HiveReservoirReporter.java | 39 +++-- .../tool/metrics/systemcube/HiveTableCreator.java | 3 +- .../lib/impl/hive/HiveProducerRecordTest.java | 81 +++++++++++ .../metrics/lib/impl/hive/HiveProducerTest.java | 161 +++++++++++++++++++++ .../lib/impl/hive/HiveReservoirReporterTest.java | 88 +++++++++++ metrics-reporter-kafka/pom.xml | 19 +++ .../impl/kafka/KafkaActiveReserviorListener.java | 8 + .../lib/impl/kafka/KafkaReservoirReporter.java | 6 +- .../lib/impl/kafka/KafkaReservoirReporterTest.java | 79 ++++++++++ 13 files changed, 585 insertions(+), 104 deletions(-) diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java index a866163..984d5f5 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java @@ -44,4 +44,36 @@ public class TimedRecordEvent extends RecordEvent { super.resetTime(); addTimeDetails(); } + + public String getYear() { + return (String) get(TimePropertyEnum.YEAR.toString()); + } + + public String getMonth() { + return (String) get(TimePropertyEnum.MONTH.toString()); + } + + public String getWeekBeginDate() { + return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString()); + } + + public String getDayDate() { + return (String) get(TimePropertyEnum.DAY_DATE.toString()); + } + + public String getDayTime() { + return (String) get(TimePropertyEnum.DAY_TIME.toString()); + } + + public int getTimeHour() { + return (int) get(TimePropertyEnum.TIME_HOUR.toString()); + } + + public int getTimeMinute() { + return (int) get(TimePropertyEnum.TIME_MINUTE.toString()); + } + + public int getTimeSecond() { + return (int) get(TimePropertyEnum.TIME_SECOND.toString()); + } } diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml index 0159804..87b7808 100644 --- a/metrics-reporter-hive/pom.xml +++ b/metrics-reporter-hive/pom.xml @@ -56,5 +56,30 @@ <artifactId>hadoop-hdfs</artifactId> <scope>provided</scope> </dependency> + + <!-- Env & Test --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4-rule-agent</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index 8bc7a43..e79010c 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -309,7 +309,7 @@ public class HiveProducer { fout = null; } - private HiveProducerRecord convertTo(Record record) throws Exception { + HiveProducerRecord convertTo(Record record) throws Exception { Map<String, Object> rawValue = record.getValueRaw(); //Set partition values for hive table @@ -330,7 +330,8 @@ public class HiveProducer { columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT))); } - return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, - columnValues); + HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond()) + .setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build(); + return new HiveProducerRecord(key, columnValues); } } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java index 650d18a..fa5222f 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java @@ -30,23 +30,8 @@ public class HiveProducerRecord { private final RecordKey key; private final List<Object> value; - public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) { - this.key = new RecordKey(dbName, tableName, partitionKVs); - this.value = value; - } - - public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) { - this.key = new RecordKey(tableName, partitionKVs); - this.value = value; - } - - public HiveProducerRecord(String dbName, String tableName, List<Object> value) { - this.key = new RecordKey(dbName, tableName); - this.value = value; - } - - public HiveProducerRecord(String tableName, List<Object> value) { - this.key = new RecordKey(tableName); + public HiveProducerRecord(RecordKey key, List<Object> value) { + this.key = key; this.value = value; } @@ -75,41 +60,55 @@ public class HiveProducerRecord { return sb.toString(); } + @Override public boolean equals(Object o) { - if (this == o) { + if (this == o) return true; - } else if (!(o instanceof HiveProducerRecord)) { + if (o == null || getClass() != o.getClass()) return false; - } else { - HiveProducerRecord that = (HiveProducerRecord) o; - if (this.key != null) { - if (!this.key.equals(that.key)) { - return false; - } - } else if (that.key != null) { - return false; - } - if (this.value != null) { - if (!this.value.equals(that.value)) { - return false; - } - } else if (that.value != null) { - return false; - } - } - return true; + + HiveProducerRecord record = (HiveProducerRecord) o; + + if (key != null ? !key.equals(record.key) : record.key != null) + return false; + return value != null ? value.equals(record.value) : record.value == null; } + @Override public int hashCode() { - int result = this.key != null ? this.key.hashCode() : 0; - result = 31 * result + (this.value != null ? this.value.hashCode() : 0); + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); return result; } + public static class KeyBuilder { + private final String tableName; + private String dbName; + private Map<String, String> partitionKVs; + + public KeyBuilder(String tableName) { + this.tableName = tableName; + } + + public KeyBuilder setDbName(String dbName) { + this.dbName = dbName; + return this; + } + + public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) { + this.partitionKVs = partitionKVs; + return this; + } + + public RecordKey build() { + return new RecordKey(dbName, tableName, partitionKVs); + } + } + /** * Use to organize metrics message */ - public class RecordKey { + public static class RecordKey { public static final String DEFAULT_DB_NAME = "DEFAULT"; private final String dbName; @@ -126,18 +125,6 @@ public class HiveProducerRecord { this.partitionKVs = partitionKVs; } - public RecordKey(String tableName, Map<String, String> partitionKVs) { - this(null, tableName, partitionKVs); - } - - public RecordKey(String dbName, String tableName) { - this(dbName, tableName, null); - } - - public RecordKey(String tableName) { - this(null, tableName, null); - } - public String database() { return this.dbName; } @@ -152,47 +139,31 @@ public class HiveProducerRecord { public String toString() { String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString(); - return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")"; + return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + + ")"; } + @Override public boolean equals(Object o) { - if (this == o) { + if (this == o) return true; - } else if (!(o instanceof RecordKey)) { + if (o == null || getClass() != o.getClass()) return false; - } else { - RecordKey that = (RecordKey) o; - if (this.dbName != null) { - if (!this.dbName.equals(that.dbName)) { - return false; - } - } else if (that.dbName != null) { - return false; - } - - if (this.tableName != null) { - if (!this.tableName.equals(that.tableName)) { - return false; - } - } else if (that.tableName != null) { - return false; - } - - if (this.partitionKVs != null) { - if (!this.partitionKVs.equals(that.partitionKVs)) { - return false; - } - } else if (that.partitionKVs != null) { - return false; - } - } - return true; + + RecordKey recordKey = (RecordKey) o; + + if (dbName != null ? !dbName.equals(recordKey.dbName) : recordKey.dbName != null) + return false; + if (tableName != null ? !tableName.equals(recordKey.tableName) : recordKey.tableName != null) + return false; + return partitionKVs != null ? partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null; } + @Override public int hashCode() { - int result = this.dbName != null ? this.dbName.hashCode() : 0; - result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0); - result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0); + int result = dbName != null ? dbName.hashCode() : 0; + result = 31 * result + (tableName != null ? tableName.hashCode() : 0); + result = 31 * result + (partitionKVs != null ? partitionKVs.hashCode() : 0); return result; } } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java index 9d93e99..d1e252f 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java @@ -84,6 +84,10 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { stop(); } + HiveReservoirListener getListener() { + return listener; + } + /** * A builder for {@link HiveReservoirReporter} instances. */ @@ -107,15 +111,19 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { } } - private class HiveReservoirListener implements ActiveReservoirListener { + class HiveReservoirListener implements ActiveReservoirListener { private Properties props; private Map<String, HiveProducer> producerMap = new HashMap<>(); + private long nRecord = 0; + private long nRecordSkip = 0; + private long nUpdate = 0; + private HiveReservoirListener(Properties props) throws Exception { this.props = props; } - private synchronized HiveProducer getProducer(String metricType) throws Exception { + synchronized HiveProducer getProducer(String metricType) throws Exception { HiveProducer producer = producerMap.get(metricType); if (producer == null) { producer = new HiveProducer(metricType, props); @@ -129,6 +137,7 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { return true; } logger.info("Try to write {} records", records.size()); + long prevNRecord = nRecord; try { Map<String, List<Record>> queues = new HashMap<>(); for (Record record : records) { @@ -142,21 +151,17 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { for (Map.Entry<String, List<Record>> entry : queues.entrySet()) { HiveProducer producer = getProducer(entry.getKey()); producer.send(entry.getValue()); + nRecord += entry.getValue().size(); } queues.clear(); + if (nUpdate++ % 100 == 0) { + logger.info("Has done the update {} times with {} records reported, {} records skipped", nUpdate, + nRecord, nRecordSkip); + } } catch (Exception e) { + nRecordSkip += records.size() - (nRecord - prevNRecord); logger.error(e.getMessage(), e); - return false; - } - return true; - } - - public boolean onRecordUpdate(final Record record) { - try { - HiveProducer producer = getProducer(record.getSubject()); - producer.send(record); - } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.warn("Has skipped reporting {} records", nRecordSkip); return false; } return true; @@ -168,5 +173,13 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { } producerMap.clear(); } + + public long getNRecord() { + return nRecord; + } + + public long getNRecordSkip() { + return nRecordSkip; + } } } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java similarity index 99% rename from tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java rename to metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java index 35d9efb..2f9eb1d 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java @@ -19,8 +19,8 @@ package org.apache.kylin.tool.metrics.systemcube; import java.util.List; - import java.util.Locale; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metrics.lib.ActiveReservoirReporter; @@ -32,7 +32,6 @@ import org.apache.kylin.metrics.property.JobPropertyEnum; import org.apache.kylin.metrics.property.QueryCubePropertyEnum; import org.apache.kylin.metrics.property.QueryPropertyEnum; import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; - import org.apache.kylin.shaded.com.google.common.base.Strings; import org.apache.kylin.shaded.com.google.common.collect.Lists; diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java new file mode 100644 index 0000000..ead74ad --- /dev/null +++ b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER; +import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class HiveProducerRecordTest { + + @Test + public void testRecord() { + String dbName = "KYLIN"; + String tableName = "test"; + Map<String, String> partitionKVs = Maps.newHashMap(); + partitionKVs.put("key1", "value1"); + + Set<RecordKey> keySet = Sets.newHashSet(); + RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build(); + RecordKey key11 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build(); + keySet.add(key1); + keySet.add(key11); + assertEquals(1, keySet.size()); + + RecordKey key2 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build(); + RecordKey key3 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs) + .build(); + keySet.add(key2); + keySet.add(key3); + assertEquals(3, keySet.size()); + assertEquals(dbName, key2.database()); + assertEquals(tableName, key2.table()); + + List<Object> value1 = Lists.<Object> newArrayList(1); + List<Object> value2 = Lists.<Object> newArrayList(1, "1"); + + assertNull(new HiveProducerRecord(key1, null).valueToString()); + + Set<HiveProducerRecord> recordSet = Sets.newHashSet(); + HiveProducerRecord record1 = new HiveProducerRecord(key1, value1); + HiveProducerRecord record11 = new HiveProducerRecord(key11, value1); + recordSet.add(record1); + recordSet.add(record11); + assertEquals(1, recordSet.size()); + assertEquals(key1, record1.key()); + assertEquals(value1, record1.value()); + + recordSet.add(new HiveProducerRecord(key1, value2)); + recordSet.add(new HiveProducerRecord(key2, value1)); + assertEquals(3, recordSet.size()); + assertEquals(1, record1.valueToString().split(DELIMITER).length); + } +} diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java new file mode 100644 index 0000000..2adc34f --- /dev/null +++ b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import static org.junit.Assert.assertEquals; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; +import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; +import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.source.hive.HiveMetaStoreClientFactory; +import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; + +@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem", + "org.apache.kylin.source.hive.HiveMetaStoreClientFactory", + "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" }) +public class HiveProducerTest { + + @Rule + public PowerMockRule rule = new PowerMockRule(); + + private HiveProducer hiveProducer; + private HiveMetaStoreClient metaStoreClient; + + @Before + public void setUp() throws Exception { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); + + FileSystem hdfs = PowerMockito.mock(FileSystem.class); + URI uri = PowerMockito.mock(URI.class); + PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", Configuration.class)).toReturn(hdfs); + PowerMockito.when(hdfs.getUri()).thenReturn(uri); + PowerMockito.when(uri.toString()).thenReturn("hdfs"); + + HiveConf hiveConf = PowerMockito.mock(HiveConf.class); + String metricsType = new HiveSink() + .getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); + + hiveProducer = new HiveProducer(metricsType, new Properties(), hiveConf); + + metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class); + PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient); + PowerMockito + .stub(PowerMockito.method(HiveMetaStoreClientFactory.class, "getHiveMetaStoreClient", HiveConf.class)) + .toReturn(metaStoreClient); + } + + @After + public void after() throws Exception { + System.clearProperty(KylinConfig.KYLIN_CONF); + } + + @Test + public void testProduce() throws Exception { + TimedRecordEvent rpcEvent = generateTestRPCRecord(); + + Map<String, String> partitionKVs = Maps.newHashMap(); + partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rpcEvent.getDayDate()); + + List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default", "test_cube", "sandbox", "NULL", 80L, 3L, + 3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(), rpcEvent.getMonth(), + rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(), rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(), + rpcEvent.getTimeSecond(), rpcEvent.getDayDate()); + + HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test") + .setDbName("KYLIN").setPartitionKVs(partitionKVs).build(); + HiveProducerRecord target = new HiveProducerRecord(key, value); + + prepareMockForEvent(rpcEvent); + assertEquals(target, hiveProducer.convertTo(rpcEvent)); + } + + private void prepareMockForEvent(RecordEvent event) throws Exception { + String tableFullName = new HiveSink().getTableFromSubject(event.getEventType()); + Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableFullName); + String dbName = tableNameSplits.getFirst(); + String tableName = tableNameSplits.getSecond(); + + Table table = PowerMockito.mock(Table.class); + PowerMockito.when(metaStoreClient, "getTable", dbName, tableName).thenReturn(table); + + StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class); + PowerMockito.when(table, "getSd").thenReturn(sd); + PowerMockito.when(sd, "getLocation").thenReturn(null); + + List<Pair<String, String>> columns = HiveTableCreator.getHiveColumnsForMetricsQueryRPC(); + List<Pair<String, String>> partitions = HiveTableCreator.getPartitionKVsForHiveTable(); + columns.addAll(partitions); + List<FieldSchema> fields = Lists.newArrayListWithExpectedSize(columns.size()); + for (Pair<String, String> column : columns) { + fields.add(new FieldSchema(column.getFirst(), column.getSecond(), null)); + } + PowerMockito.when(metaStoreClient, "getFields", dbName, tableName).thenReturn(fields); + } + + private TimedRecordEvent generateTestRPCRecord() { + TimedRecordEvent rpcMetricsEvent = new TimedRecordEvent("metrics_query_rpc_test"); + setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox", null); + setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L); + return rpcMetricsEvent; + } + + private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName, + String rpcServer, Throwable throwable) { + metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName); + metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName); + metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer); + metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), + throwable == null ? "NULL" : throwable.getClass().getName()); + } + + private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount, + long returnCount, long aggrCount) { + metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs); + metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter + metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server + metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server + metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor + metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor + } +} diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java new file mode 100644 index 0000000..fbb656c --- /dev/null +++ b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import static org.junit.Assert.assertEquals; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.InstantReservoir; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; + +import com.google.common.collect.Lists; + +@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class }) +public class HiveReservoirReporterTest { + + @Rule + public PowerMockRule rule = new PowerMockRule(); + + private HiveReservoirReporter hiveReporter; + private ActiveReservoir reservoir; + + @Before + public void setUp() throws Exception { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); + + HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class); + PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer); + + reservoir = new InstantReservoir(); + reservoir.start(); + hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build(); + } + + @After + public void after() throws Exception { + System.clearProperty(KylinConfig.KYLIN_CONF); + } + + @Test + public void testUpdate() throws Exception { + String metricsType = "TEST"; + Record record = new RecordEvent(metricsType); + reservoir.update(record); + assertEquals(0, hiveReporter.getListener().getNRecord()); + + hiveReporter.start(); + reservoir.update(record); + reservoir.update(record); + assertEquals(2, hiveReporter.getListener().getNRecord()); + + hiveReporter.stop(); + reservoir.update(record); + assertEquals(2, hiveReporter.getListener().getNRecord()); + + hiveReporter.start(); + reservoir.update(record); + PowerMockito.doThrow(new Exception()).when(hiveReporter.getListener().getProducer(metricsType)) + .send(Lists.newArrayList(record)); + reservoir.update(record); + assertEquals(3, hiveReporter.getListener().getNRecord()); + assertEquals(1, hiveReporter.getListener().getNRecordSkip()); + } +} diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml index 173febdc..30759d4 100644 --- a/metrics-reporter-kafka/pom.xml +++ b/metrics-reporter-kafka/pom.xml @@ -42,5 +42,24 @@ <artifactId>kafka_2.11</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4-rule-agent</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java index df79c57..b1a1bd1 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java @@ -115,4 +115,12 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis logger.debug("Cannot find topic {}", topic); topicsIfAvailable.put(topic, System.currentTimeMillis()); } + + public long getNRecord() { + return nRecord; + } + + public long getNRecordSkip() { + return nRecordSkip; + } } diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java index a7b58a6..97b839c 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java @@ -88,6 +88,10 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter { stop(); } + KafkaReservoirListener getListener() { + return listener; + } + /** * A builder for {@link KafkaReservoirReporter} instances. */ @@ -113,7 +117,7 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter { } } - private class KafkaReservoirListener extends KafkaActiveReserviorListener { + class KafkaReservoirListener extends KafkaActiveReserviorListener { protected final Producer<byte[], byte[]> producer; private KafkaReservoirListener(Properties props) { diff --git a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java new file mode 100644 index 0000000..4a14e66 --- /dev/null +++ b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.kafka; + +import static org.junit.Assert.assertEquals; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.InstantReservoir; +import org.apache.kylin.metrics.lib.impl.RecordEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; + +@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class }) +public class KafkaReservoirReporterTest { + + @Rule + public PowerMockRule rule = new PowerMockRule(); + + private KafkaReservoirReporter kafkaReporter; + private ActiveReservoir reservoir; + + @Before + public void setUp() throws Exception { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); + + KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class); + PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer); + + reservoir = new InstantReservoir(); + reservoir.start(); + kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build(); + } + + @After + public void after() throws Exception { + System.clearProperty(KylinConfig.KYLIN_CONF); + } + + @Test + public void testUpdate() { + Record record = new RecordEvent("TEST"); + reservoir.update(record); + assertEquals(0, kafkaReporter.getListener().getNRecord()); + + kafkaReporter.start(); + reservoir.update(record); + reservoir.update(record); + assertEquals(2, kafkaReporter.getListener().getNRecord()); + + kafkaReporter.stop(); + reservoir.update(record); + assertEquals(2, kafkaReporter.getListener().getNRecord()); + assertEquals(0, kafkaReporter.getListener().getNRecordSkip()); + } +}