This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master-hadoop3 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master-hadoop3 by this push: new d8af2c8 Revert "KYLIN-4508 Add unit test for core-metrics module & reporters" d8af2c8 is described below commit d8af2c8385535f804009bbbca1f618918de71702 Author: XiaoxiangYu <x...@apache.org> AuthorDate: Tue Oct 13 13:37:01 2020 +0800 Revert "KYLIN-4508 Add unit test for core-metrics module & reporters" This reverts commit 8d894e2c8098a41d3778933cc93484a2c5f582e3. --- .../kylin/metrics/lib/impl/TimedRecordEvent.java | 32 ---- metrics-reporter-hive/pom.xml | 25 ---- .../kylin/metrics/lib/impl/hive/HiveProducer.java | 7 +- .../metrics/lib/impl/hive/HiveProducerRecord.java | 141 +++++++++++------- .../lib/impl/hive/HiveReservoirReporter.java | 39 ++--- .../lib/impl/hive/HiveProducerRecordTest.java | 81 ----------- .../metrics/lib/impl/hive/HiveProducerTest.java | 161 --------------------- .../lib/impl/hive/HiveReservoirReporterTest.java | 88 ----------- metrics-reporter-kafka/pom.xml | 19 --- .../impl/kafka/KafkaActiveReserviorListener.java | 8 - .../lib/impl/kafka/KafkaReservoirReporter.java | 6 +- .../lib/impl/kafka/KafkaReservoirReporterTest.java | 79 ---------- .../tool/metrics/systemcube/HiveTableCreator.java | 3 +- 13 files changed, 104 insertions(+), 585 deletions(-) diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java index 984d5f5..a866163 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java @@ -44,36 +44,4 @@ public class TimedRecordEvent extends RecordEvent { super.resetTime(); addTimeDetails(); } - - public String getYear() { - return (String) get(TimePropertyEnum.YEAR.toString()); - } - - public String getMonth() { - return (String) get(TimePropertyEnum.MONTH.toString()); - } - - public String getWeekBeginDate() { - return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString()); - } - - public String getDayDate() { - return (String) get(TimePropertyEnum.DAY_DATE.toString()); - } - - public String getDayTime() { - return (String) get(TimePropertyEnum.DAY_TIME.toString()); - } - - public int getTimeHour() { - return (int) get(TimePropertyEnum.TIME_HOUR.toString()); - } - - public int getTimeMinute() { - return (int) get(TimePropertyEnum.TIME_MINUTE.toString()); - } - - public int getTimeSecond() { - return (int) get(TimePropertyEnum.TIME_SECOND.toString()); - } } diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml index 87b7808..0159804 100644 --- a/metrics-reporter-hive/pom.xml +++ b/metrics-reporter-hive/pom.xml @@ -56,30 +56,5 @@ <artifactId>hadoop-hdfs</artifactId> <scope>provided</scope> </dependency> - - <!-- Env & Test --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4-rule-agent</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> </dependencies> </project> \ No newline at end of file diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index e79010c..8bc7a43 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -309,7 +309,7 @@ public class HiveProducer { fout = null; } - HiveProducerRecord convertTo(Record record) throws Exception { + private HiveProducerRecord convertTo(Record record) throws Exception { Map<String, Object> rawValue = record.getValueRaw(); //Set partition values for hive table @@ -330,8 +330,7 @@ public class HiveProducer { columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT))); } - HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond()) - .setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build(); - return new HiveProducerRecord(key, columnValues); + return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, + columnValues); } } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java index fa5222f..650d18a 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java @@ -30,8 +30,23 @@ public class HiveProducerRecord { private final RecordKey key; private final List<Object> value; - public HiveProducerRecord(RecordKey key, List<Object> value) { - this.key = key; + public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) { + this.key = new RecordKey(dbName, tableName, partitionKVs); + this.value = value; + } + + public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) { + this.key = new RecordKey(tableName, partitionKVs); + this.value = value; + } + + public HiveProducerRecord(String dbName, String tableName, List<Object> value) { + this.key = new RecordKey(dbName, tableName); + this.value = value; + } + + public HiveProducerRecord(String tableName, List<Object> value) { + this.key = new RecordKey(tableName); this.value = value; } @@ -60,55 +75,41 @@ public class HiveProducerRecord { return sb.toString(); } - @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) - return false; - - HiveProducerRecord record = (HiveProducerRecord) o; - - if (key != null ? !key.equals(record.key) : record.key != null) + } else if (!(o instanceof HiveProducerRecord)) { return false; - return value != null ? value.equals(record.value) : record.value == null; + } else { + HiveProducerRecord that = (HiveProducerRecord) o; + if (this.key != null) { + if (!this.key.equals(that.key)) { + return false; + } + } else if (that.key != null) { + return false; + } + if (this.value != null) { + if (!this.value.equals(that.value)) { + return false; + } + } else if (that.value != null) { + return false; + } + } + return true; } - @Override public int hashCode() { - int result = key != null ? key.hashCode() : 0; - result = 31 * result + (value != null ? value.hashCode() : 0); + int result = this.key != null ? this.key.hashCode() : 0; + result = 31 * result + (this.value != null ? this.value.hashCode() : 0); return result; } - public static class KeyBuilder { - private final String tableName; - private String dbName; - private Map<String, String> partitionKVs; - - public KeyBuilder(String tableName) { - this.tableName = tableName; - } - - public KeyBuilder setDbName(String dbName) { - this.dbName = dbName; - return this; - } - - public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) { - this.partitionKVs = partitionKVs; - return this; - } - - public RecordKey build() { - return new RecordKey(dbName, tableName, partitionKVs); - } - } - /** * Use to organize metrics message */ - public static class RecordKey { + public class RecordKey { public static final String DEFAULT_DB_NAME = "DEFAULT"; private final String dbName; @@ -125,6 +126,18 @@ public class HiveProducerRecord { this.partitionKVs = partitionKVs; } + public RecordKey(String tableName, Map<String, String> partitionKVs) { + this(null, tableName, partitionKVs); + } + + public RecordKey(String dbName, String tableName) { + this(dbName, tableName, null); + } + + public RecordKey(String tableName) { + this(null, tableName, null); + } + public String database() { return this.dbName; } @@ -139,31 +152,47 @@ public class HiveProducerRecord { public String toString() { String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString(); - return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs - + ")"; + return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")"; } - @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) - return false; - - RecordKey recordKey = (RecordKey) o; - - if (dbName != null ? !dbName.equals(recordKey.dbName) : recordKey.dbName != null) - return false; - if (tableName != null ? !tableName.equals(recordKey.tableName) : recordKey.tableName != null) + } else if (!(o instanceof RecordKey)) { return false; - return partitionKVs != null ? partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null; + } else { + RecordKey that = (RecordKey) o; + if (this.dbName != null) { + if (!this.dbName.equals(that.dbName)) { + return false; + } + } else if (that.dbName != null) { + return false; + } + + if (this.tableName != null) { + if (!this.tableName.equals(that.tableName)) { + return false; + } + } else if (that.tableName != null) { + return false; + } + + if (this.partitionKVs != null) { + if (!this.partitionKVs.equals(that.partitionKVs)) { + return false; + } + } else if (that.partitionKVs != null) { + return false; + } + } + return true; } - @Override public int hashCode() { - int result = dbName != null ? dbName.hashCode() : 0; - result = 31 * result + (tableName != null ? tableName.hashCode() : 0); - result = 31 * result + (partitionKVs != null ? partitionKVs.hashCode() : 0); + int result = this.dbName != null ? this.dbName.hashCode() : 0; + result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0); + result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0); return result; } } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java index d1e252f..9d93e99 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java @@ -84,10 +84,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { stop(); } - HiveReservoirListener getListener() { - return listener; - } - /** * A builder for {@link HiveReservoirReporter} instances. */ @@ -111,19 +107,15 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { } } - class HiveReservoirListener implements ActiveReservoirListener { + private class HiveReservoirListener implements ActiveReservoirListener { private Properties props; private Map<String, HiveProducer> producerMap = new HashMap<>(); - private long nRecord = 0; - private long nRecordSkip = 0; - private long nUpdate = 0; - private HiveReservoirListener(Properties props) throws Exception { this.props = props; } - synchronized HiveProducer getProducer(String metricType) throws Exception { + private synchronized HiveProducer getProducer(String metricType) throws Exception { HiveProducer producer = producerMap.get(metricType); if (producer == null) { producer = new HiveProducer(metricType, props); @@ -137,7 +129,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { return true; } logger.info("Try to write {} records", records.size()); - long prevNRecord = nRecord; try { Map<String, List<Record>> queues = new HashMap<>(); for (Record record : records) { @@ -151,17 +142,21 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { for (Map.Entry<String, List<Record>> entry : queues.entrySet()) { HiveProducer producer = getProducer(entry.getKey()); producer.send(entry.getValue()); - nRecord += entry.getValue().size(); } queues.clear(); - if (nUpdate++ % 100 == 0) { - logger.info("Has done the update {} times with {} records reported, {} records skipped", nUpdate, - nRecord, nRecordSkip); - } } catch (Exception e) { - nRecordSkip += records.size() - (nRecord - prevNRecord); logger.error(e.getMessage(), e); - logger.warn("Has skipped reporting {} records", nRecordSkip); + return false; + } + return true; + } + + public boolean onRecordUpdate(final Record record) { + try { + HiveProducer producer = getProducer(record.getSubject()); + producer.send(record); + } catch (Exception e) { + logger.error(e.getMessage(), e); return false; } return true; @@ -173,13 +168,5 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { } producerMap.clear(); } - - public long getNRecord() { - return nRecord; - } - - public long getNRecordSkip() { - return nRecordSkip; - } } } diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java deleted file mode 100644 index ead74ad..0000000 --- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metrics.lib.impl.hive; - -import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER; -import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey; -import org.junit.Test; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class HiveProducerRecordTest { - - @Test - public void testRecord() { - String dbName = "KYLIN"; - String tableName = "test"; - Map<String, String> partitionKVs = Maps.newHashMap(); - partitionKVs.put("key1", "value1"); - - Set<RecordKey> keySet = Sets.newHashSet(); - RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build(); - RecordKey key11 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build(); - keySet.add(key1); - keySet.add(key11); - assertEquals(1, keySet.size()); - - RecordKey key2 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build(); - RecordKey key3 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs) - .build(); - keySet.add(key2); - keySet.add(key3); - assertEquals(3, keySet.size()); - assertEquals(dbName, key2.database()); - assertEquals(tableName, key2.table()); - - List<Object> value1 = Lists.<Object> newArrayList(1); - List<Object> value2 = Lists.<Object> newArrayList(1, "1"); - - assertNull(new HiveProducerRecord(key1, null).valueToString()); - - Set<HiveProducerRecord> recordSet = Sets.newHashSet(); - HiveProducerRecord record1 = new HiveProducerRecord(key1, value1); - HiveProducerRecord record11 = new HiveProducerRecord(key11, value1); - recordSet.add(record1); - recordSet.add(record11); - assertEquals(1, recordSet.size()); - assertEquals(key1, record1.key()); - assertEquals(value1, record1.value()); - - recordSet.add(new HiveProducerRecord(key1, value2)); - recordSet.add(new HiveProducerRecord(key2, value1)); - assertEquals(3, recordSet.size()); - assertEquals(1, record1.valueToString().split(DELIMITER).length); - } -} diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java deleted file mode 100644 index 2adc34f..0000000 --- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metrics.lib.impl.hive; - -import static org.junit.Assert.assertEquals; - -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metrics.lib.ActiveReservoirReporter; -import org.apache.kylin.metrics.lib.impl.RecordEvent; -import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; -import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; -import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Maps; -import org.apache.kylin.source.hive.HiveMetaStoreClientFactory; -import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.rule.PowerMockRule; - -@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem", - "org.apache.kylin.source.hive.HiveMetaStoreClientFactory", - "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" }) -public class HiveProducerTest { - - @Rule - public PowerMockRule rule = new PowerMockRule(); - - private HiveProducer hiveProducer; - private HiveMetaStoreClient metaStoreClient; - - @Before - public void setUp() throws Exception { - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); - - FileSystem hdfs = PowerMockito.mock(FileSystem.class); - URI uri = PowerMockito.mock(URI.class); - PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", Configuration.class)).toReturn(hdfs); - PowerMockito.when(hdfs.getUri()).thenReturn(uri); - PowerMockito.when(uri.toString()).thenReturn("hdfs"); - - HiveConf hiveConf = PowerMockito.mock(HiveConf.class); - String metricsType = new HiveSink() - .getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); - - hiveProducer = new HiveProducer(metricsType, new Properties(), hiveConf); - - metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class); - PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient); - PowerMockito - .stub(PowerMockito.method(HiveMetaStoreClientFactory.class, "getHiveMetaStoreClient", HiveConf.class)) - .toReturn(metaStoreClient); - } - - @After - public void after() throws Exception { - System.clearProperty(KylinConfig.KYLIN_CONF); - } - - @Test - public void testProduce() throws Exception { - TimedRecordEvent rpcEvent = generateTestRPCRecord(); - - Map<String, String> partitionKVs = Maps.newHashMap(); - partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rpcEvent.getDayDate()); - - List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default", "test_cube", "sandbox", "NULL", 80L, 3L, - 3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(), rpcEvent.getMonth(), - rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(), rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(), - rpcEvent.getTimeSecond(), rpcEvent.getDayDate()); - - HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test") - .setDbName("KYLIN").setPartitionKVs(partitionKVs).build(); - HiveProducerRecord target = new HiveProducerRecord(key, value); - - prepareMockForEvent(rpcEvent); - assertEquals(target, hiveProducer.convertTo(rpcEvent)); - } - - private void prepareMockForEvent(RecordEvent event) throws Exception { - String tableFullName = new HiveSink().getTableFromSubject(event.getEventType()); - Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableFullName); - String dbName = tableNameSplits.getFirst(); - String tableName = tableNameSplits.getSecond(); - - Table table = PowerMockito.mock(Table.class); - PowerMockito.when(metaStoreClient, "getTable", dbName, tableName).thenReturn(table); - - StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class); - PowerMockito.when(table, "getSd").thenReturn(sd); - PowerMockito.when(sd, "getLocation").thenReturn(null); - - List<Pair<String, String>> columns = HiveTableCreator.getHiveColumnsForMetricsQueryRPC(); - List<Pair<String, String>> partitions = HiveTableCreator.getPartitionKVsForHiveTable(); - columns.addAll(partitions); - List<FieldSchema> fields = Lists.newArrayListWithExpectedSize(columns.size()); - for (Pair<String, String> column : columns) { - fields.add(new FieldSchema(column.getFirst(), column.getSecond(), null)); - } - PowerMockito.when(metaStoreClient, "getFields", dbName, tableName).thenReturn(fields); - } - - private TimedRecordEvent generateTestRPCRecord() { - TimedRecordEvent rpcMetricsEvent = new TimedRecordEvent("metrics_query_rpc_test"); - setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox", null); - setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L); - return rpcMetricsEvent; - } - - private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName, - String rpcServer, Throwable throwable) { - metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName); - metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName); - metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer); - metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(), - throwable == null ? "NULL" : throwable.getClass().getName()); - } - - private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount, - long returnCount, long aggrCount) { - metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs); - metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter - metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server - metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server - metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor - metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor - } -} diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java deleted file mode 100644 index fbb656c..0000000 --- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metrics.lib.impl.hive; - -import static org.junit.Assert.assertEquals; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metrics.lib.ActiveReservoir; -import org.apache.kylin.metrics.lib.Record; -import org.apache.kylin.metrics.lib.impl.InstantReservoir; -import org.apache.kylin.metrics.lib.impl.RecordEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.rule.PowerMockRule; - -import com.google.common.collect.Lists; - -@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class }) -public class HiveReservoirReporterTest { - - @Rule - public PowerMockRule rule = new PowerMockRule(); - - private HiveReservoirReporter hiveReporter; - private ActiveReservoir reservoir; - - @Before - public void setUp() throws Exception { - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); - - HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class); - PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer); - - reservoir = new InstantReservoir(); - reservoir.start(); - hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build(); - } - - @After - public void after() throws Exception { - System.clearProperty(KylinConfig.KYLIN_CONF); - } - - @Test - public void testUpdate() throws Exception { - String metricsType = "TEST"; - Record record = new RecordEvent(metricsType); - reservoir.update(record); - assertEquals(0, hiveReporter.getListener().getNRecord()); - - hiveReporter.start(); - reservoir.update(record); - reservoir.update(record); - assertEquals(2, hiveReporter.getListener().getNRecord()); - - hiveReporter.stop(); - reservoir.update(record); - assertEquals(2, hiveReporter.getListener().getNRecord()); - - hiveReporter.start(); - reservoir.update(record); - PowerMockito.doThrow(new Exception()).when(hiveReporter.getListener().getProducer(metricsType)) - .send(Lists.newArrayList(record)); - reservoir.update(record); - assertEquals(3, hiveReporter.getListener().getNRecord()); - assertEquals(1, hiveReporter.getListener().getNRecordSkip()); - } -} diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml index 30759d4..173febdc 100644 --- a/metrics-reporter-kafka/pom.xml +++ b/metrics-reporter-kafka/pom.xml @@ -42,24 +42,5 @@ <artifactId>kafka_2.11</artifactId> <scope>provided</scope> </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4-rule-agent</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> </dependencies> </project> \ No newline at end of file diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java index b1a1bd1..df79c57 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java @@ -115,12 +115,4 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis logger.debug("Cannot find topic {}", topic); topicsIfAvailable.put(topic, System.currentTimeMillis()); } - - public long getNRecord() { - return nRecord; - } - - public long getNRecordSkip() { - return nRecordSkip; - } } diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java index 97b839c..a7b58a6 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java @@ -88,10 +88,6 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter { stop(); } - KafkaReservoirListener getListener() { - return listener; - } - /** * A builder for {@link KafkaReservoirReporter} instances. */ @@ -117,7 +113,7 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter { } } - class KafkaReservoirListener extends KafkaActiveReserviorListener { + private class KafkaReservoirListener extends KafkaActiveReserviorListener { protected final Producer<byte[], byte[]> producer; private KafkaReservoirListener(Properties props) { diff --git a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java deleted file mode 100644 index 4a14e66..0000000 --- a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metrics.lib.impl.kafka; - -import static org.junit.Assert.assertEquals; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metrics.lib.ActiveReservoir; -import org.apache.kylin.metrics.lib.Record; -import org.apache.kylin.metrics.lib.impl.InstantReservoir; -import org.apache.kylin.metrics.lib.impl.RecordEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.rule.PowerMockRule; - -@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class }) -public class KafkaReservoirReporterTest { - - @Rule - public PowerMockRule rule = new PowerMockRule(); - - private KafkaReservoirReporter kafkaReporter; - private ActiveReservoir reservoir; - - @Before - public void setUp() throws Exception { - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta"); - - KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class); - PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer); - - reservoir = new InstantReservoir(); - reservoir.start(); - kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build(); - } - - @After - public void after() throws Exception { - System.clearProperty(KylinConfig.KYLIN_CONF); - } - - @Test - public void testUpdate() { - Record record = new RecordEvent("TEST"); - reservoir.update(record); - assertEquals(0, kafkaReporter.getListener().getNRecord()); - - kafkaReporter.start(); - reservoir.update(record); - reservoir.update(record); - assertEquals(2, kafkaReporter.getListener().getNRecord()); - - kafkaReporter.stop(); - reservoir.update(record); - assertEquals(2, kafkaReporter.getListener().getNRecord()); - assertEquals(0, kafkaReporter.getListener().getNRecordSkip()); - } -} diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java similarity index 99% rename from metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java rename to tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java index 2f9eb1d..35d9efb 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java @@ -19,8 +19,8 @@ package org.apache.kylin.tool.metrics.systemcube; import java.util.List; -import java.util.Locale; +import java.util.Locale; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metrics.lib.ActiveReservoirReporter; @@ -32,6 +32,7 @@ import org.apache.kylin.metrics.property.JobPropertyEnum; import org.apache.kylin.metrics.property.QueryCubePropertyEnum; import org.apache.kylin.metrics.property.QueryPropertyEnum; import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; + import org.apache.kylin.shaded.com.google.common.base.Strings; import org.apache.kylin.shaded.com.google.common.collect.Lists;