KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters
This closes #77 Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2b32aa4c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2b32aa4c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2b32aa4c Branch: refs/heads/master Commit: 2b32aa4ca91ccdb4a885c4c509ae173bea5658d5 Parents: b53e54f Author: Zhong <nju_y...@apache.org> Authored: Tue Aug 8 22:50:54 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Oct 20 07:05:39 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 10 +- core-metrics/pom.xml | 51 ++++ .../kylin/metrics/lib/ActiveReservoir.java | 40 +++ .../metrics/lib/ActiveReservoirFilter.java | 44 +++ .../metrics/lib/ActiveReservoirListener.java | 30 ++ .../metrics/lib/ActiveReservoirReporter.java | 51 ++++ .../org/apache/kylin/metrics/lib/Record.java | 51 ++++ .../java/org/apache/kylin/metrics/lib/Sink.java | 23 ++ .../lib/impl/AbstractActiveReservoir.java | 68 +++++ .../metrics/lib/impl/BaseScheduledReporter.java | 103 +++++++ .../metrics/lib/impl/BlockingReservoir.java | 167 ++++++++++++ .../metrics/lib/impl/InstantReservoir.java | 76 ++++++ .../kylin/metrics/lib/impl/MetricsSystem.java | 164 +++++++++++ .../kylin/metrics/lib/impl/RecordEvent.java | 272 +++++++++++++++++++ .../metrics/lib/impl/RecordEventTimeDetail.java | 77 ++++++ .../metrics/lib/impl/RecordEventWrapper.java | 61 +++++ .../kylin/metrics/lib/impl/ReporterBuilder.java | 48 ++++ .../kylin/metrics/lib/impl/StubReservoir.java | 54 ++++ .../metrics/lib/impl/StubReservoirReporter.java | 51 ++++ .../apache/kylin/metrics/lib/impl/StubSink.java | 30 ++ .../metrics/lib/impl/TimePropertyEnum.java | 49 ++++ metrics-reporter-hive/pom.xml | 53 ++++ .../metrics/lib/impl/hive/HiveProducer.java | 201 ++++++++++++++ .../lib/impl/hive/HiveProducerRecord.java | 196 +++++++++++++ .../lib/impl/hive/HiveReservoirReporter.java | 139 ++++++++++ .../kylin/metrics/lib/impl/hive/HiveSink.java | 30 ++ metrics-reporter-kafka/pom.xml | 46 ++++ .../kafka/KafkaActiveReserviorListener.java | 115 ++++++++ .../lib/impl/kafka/KafkaReservoirReporter.java | 139 ++++++++++ .../kylin/metrics/lib/impl/kafka/KafkaSink.java | 29 ++ pom.xml | 25 ++ 31 files changed, 2490 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 34d8b7c..f3cf6c0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1295,9 +1295,9 @@ abstract public class KylinConfigBase implements Serializable { return Integer.valueOf(this.getOptional("kylin.restclient.connection.max-total", "200")); } - /** - * metric - */ + // ============================================================================ + // Metrics + // ============================================================================ public String getCoadhaleMetricsReportClassesNames() { return getOptional("kylin.metrics.reporter-classes", "org.apache.kylin.common.metrics.metrics2.JsonFileMetricsReporter,org.apache.kylin.common.metrics.metrics2.JmxMetricsReporter"); @@ -1315,4 +1315,8 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.metrics.perflogger-class", "org.apache.kylin.common.metrics.perflog.PerfLogger"); } + public String getMetricsActiveReservoirDefaultClass() { + return getOptional("kylin.metrics.active-reservoir-default-class", + "org.apache.kylin.metrics.lib.impl.StubReservoir"); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml new file mode 100644 index 0000000..e436c97 --- /dev/null +++ b/core-metrics/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-core-metrics</artifactId> + <packaging>jar</packaging> + <name>Apache Kylin - Core Metrics</name> + <description>Apache Kylin - Core Metrics</description> + + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>2.3.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java new file mode 100644 index 0000000..36ab759 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +import java.io.Closeable; + +public interface ActiveReservoir extends Closeable { + + int size(); + + void update(Record record); + + void addListener(ActiveReservoirListener listener); + + void removeListener(ActiveReservoirListener listener); + + void removeAllListener(); + + void setHAListener(ActiveReservoirListener listener); + + void start(); + + void stop(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java new file mode 100644 index 0000000..5cffcfc --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +/** + * A filter used to determine whether or not an active reservoir should be reported, among other things. + */ +public interface ActiveReservoirFilter { + + /** + * Matches all active reservoirs, regardless of type or name. + */ + ActiveReservoirFilter ALL = new ActiveReservoirFilter() { + @Override + public boolean matches(String name, ActiveReservoir activeReservoir) { + return true; + } + }; + + /** + * Returns {@code true} if the active reservoir matches the filter; {@code false} otherwise. + * + * @param name the active reservoir's name + * @param activeReservoir the active reservoir + * @return {@code true} if the active reservoir matches the filter + */ + boolean matches(String name, ActiveReservoir activeReservoir); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java new file mode 100644 index 0000000..f64caba --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +import java.io.Closeable; +import java.util.EventListener; +import java.util.List; + +public interface ActiveReservoirListener extends EventListener, Closeable { + + boolean onRecordUpdate(final List<Record> records); + + void close(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java new file mode 100644 index 0000000..6020865 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +import java.io.Closeable; +import java.util.regex.Pattern; + +import org.apache.kylin.common.util.Pair; + +import com.google.common.base.Strings; + +public abstract class ActiveReservoirReporter implements Closeable { + + public static final String KYLIN_PREFIX = "KYLIN"; + + public static Pair<String, String> getTableNameSplits(String tableName) { + if (Strings.isNullOrEmpty(tableName)) { + return null; + } + + String[] splits = tableName.split(Pattern.quote(".")); + int i = 0; + String database = splits.length == 1 ? KYLIN_PREFIX : splits[i++]; + String tableNameOnly = splits[i]; + return new Pair(database, tableNameOnly); + } + + public static String getTableName(Pair<String, String> tableNameSplits) { + return tableNameSplits.getFirst() + "." + tableNameSplits.getSecond(); + } + + public abstract void start(); + + public abstract void stop(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java new file mode 100644 index 0000000..a1bce1f --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +import java.util.Map; + +public interface Record { + + /** + * For classification + */ + String getType(); + + /** + * For keep ordering in the same category + */ + byte[] getKey(); + + /** + * For the contents will be used + */ + byte[] getValue(); + + /** + * For the raw contents will be used + */ + Map<String, Object> getValueRaw(); + + /** + * For the timestamp the record created + */ + Long getTime(); + + Record clone(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java new file mode 100644 index 0000000..dff71bd --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib; + +public interface Sink { + String getTableFromSubject(String subject); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java new file mode 100644 index 0000000..cc72710 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.util.List; + +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirListener; + +import com.google.common.collect.Lists; + +public abstract class AbstractActiveReservoir implements ActiveReservoir { + + protected List<ActiveReservoirListener> listeners = Lists.newArrayList(); + + protected ActiveReservoirListener listenerHA = new StubReservoirReporter().listener; + + protected boolean isReady = false; + + public void addListener(ActiveReservoirListener listener) { + listeners.add(listener); + } + + public void removeListener(ActiveReservoirListener listener) { + listener.close(); + listeners.remove(listener); + } + + public void removeAllListener() { + for (ActiveReservoirListener listener : listeners) { + listener.close(); + } + listeners.clear(); + } + + public void setHAListener(ActiveReservoirListener listener) { + this.listenerHA = listener; + } + + public void start() { + isReady = true; + } + + public void stop() { + isReady = false; + } + + public void close() { + stop(); + removeAllListener(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java new file mode 100644 index 0000000..531376a --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.io.Closeable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public abstract class BaseScheduledReporter implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(BaseScheduledReporter.class); + + private final ScheduledExecutorService executor; + + BaseScheduledReporter() { + this("default"); + } + + BaseScheduledReporter(String name) { + this(Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("metrics-scheduler-" + name + "-%d").build())); + } + + BaseScheduledReporter(ScheduledExecutorService executor) { + this.executor = executor; + } + + public abstract void report(); + + /** + * Starts the reporter polling at the given period. + * + * @param period the amount of time between polls + * @param unit the unit for {@code period} + */ + public void start(long period, TimeUnit unit) { + executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + report(); + } catch (RuntimeException ex) { + logger.error("RuntimeException thrown from {}#report. Exception was suppressed.", + BaseScheduledReporter.this.getClass().getSimpleName(), ex); + } + } + }, period, period, unit); + } + + /** + * Stops the reporter and shuts down its thread of execution. + * + * Uses the shutdown pattern from http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html + */ + public void stop() { + executor.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + /** + * Stops the reporter and shuts down its thread of execution. + */ + @Override + public void close() { + stop(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java new file mode 100644 index 0000000..3301867 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class BlockingReservoir extends AbstractActiveReservoir { + + private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class); + + private final BlockingQueue<Record> recordsQueue; + private final Thread scheduledReporter; + private final int MIN_REPORT_SIZE; + private final int MAX_REPORT_SIZE; + private final long MAX_REPORT_TIME; + private List<Record> records; + + public BlockingReservoir() { + this(1, 100); + } + + public BlockingReservoir(int minReportSize, int maxReportSize) { + this(minReportSize, maxReportSize, 10); + } + + public BlockingReservoir(int minReportSize, int maxReportSize, int MAX_REPORT_TIME) { + this.MAX_REPORT_SIZE = maxReportSize; + this.MIN_REPORT_SIZE = minReportSize; + this.MAX_REPORT_TIME = MAX_REPORT_TIME * 60 * 1000L; + + this.recordsQueue = new LinkedBlockingQueue<>(); + this.listeners = Lists.newArrayList(); + + this.records = Lists.newArrayListWithExpectedSize(MAX_REPORT_SIZE); + + scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build() + .newThread(new ReporterRunnable()); + } + + public void update(Record record) { + if (!isReady) { + logger.info("Current reservoir is not ready for update record"); + return; + } + try { + recordsQueue.put(record); + } catch (InterruptedException e) { + logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString()); + } + } + + public int size() { + return recordsQueue.size(); + } + + private void onRecordUpdate(boolean ifAll) { + if (ifAll) { + records = Lists.newArrayList(); + recordsQueue.drainTo(records); + } else { + records.clear(); + recordsQueue.drainTo(records, MAX_REPORT_SIZE); + } + + boolean ifSucceed = true; + for (ActiveReservoirListener listener : listeners) { + if (!notifyListenerOfUpdatedRecord(listener, records)) { + ifSucceed = false; + logger.warn("It fails to notify listener " + listener.toString() + " of updated records " + + records.toString()); + } + } + if (!ifSucceed) { + notifyListenerHAOfUpdatedRecord(records); + } + } + + private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, List<Record> records) { + return listener.onRecordUpdate(records); + } + + private boolean notifyListenerHAOfUpdatedRecord(List<Record> records) { + logger.info("The HA listener " + listenerHA.toString() + " for updated records " + records.toString() + + " will be started"); + if (!notifyListenerOfUpdatedRecord(listenerHA, records)) { + logger.error("The HA listener also fails!!!"); + return false; + } + return true; + } + + public void start() { + super.start(); + scheduledReporter.start(); + } + + public void stop() { + super.stop(); + scheduledReporter.interrupt(); + try { + scheduledReporter.join(); + } catch (InterruptedException e) { + logger.warn("Interrupted during join"); + throw new RuntimeException(e); + } + } + + class ReporterRunnable implements Runnable { + + public void run() { + long startTime = System.currentTimeMillis(); + while (isReady) { + if (size() <= 0) { + logger.info("There's no record in the blocking queue."); + sleep(); + startTime = System.currentTimeMillis(); + continue; + } else if (size() < MIN_REPORT_SIZE && (System.currentTimeMillis() - startTime < MAX_REPORT_TIME)) { + logger.info("The number of records in the blocking queue is less than " + MIN_REPORT_SIZE + // + " and the duration from last reporting is less than " + MAX_REPORT_TIME + + "ms. Will delay to report!"); + sleep(); + continue; + } + + onRecordUpdate(false); + startTime = System.currentTimeMillis(); + } + onRecordUpdate(true); + logger.info("Reporter finishes reporting metrics."); + } + + private void sleep() { + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + logger.warn("Interrupted during running"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java new file mode 100644 index 0000000..41b53cf --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.util.List; + +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class InstantReservoir extends AbstractActiveReservoir { + + private static final Logger logger = LoggerFactory.getLogger(InstantReservoir.class); + + public void update(Record record) { + if (!isReady) { + logger.info("Current reservoir is not ready for update record"); + return; + } + onRecordUpdate(record); + } + + public int size() { + return 0; + } + + private void onRecordUpdate(Record record) { + boolean ifSucceed = true; + for (ActiveReservoirListener listener : listeners) { + if (!notifyListenerOfUpdatedRecord(listener, record)) { + ifSucceed = false; + logger.warn( + "It fails to notify listener " + listener.toString() + " of updated record " + record.getKey()); + } + } + if (!ifSucceed) { + notifyListenerHAOfUpdatedRecord(record); + } + } + + private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, Record record) { + List<Record> recordsList = Lists.newArrayList(); + recordsList.add(record); + return listener.onRecordUpdate(recordsList); + } + + private boolean notifyListenerHAOfUpdatedRecord(Record record) { + logger.info("The HA listener " + listenerHA.toString() + " for updated record " + record.getKey() + + " will be started"); + if (!notifyListenerOfUpdatedRecord(listenerHA, record)) { + logger.error("The HA listener also fails!!!"); + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java new file mode 100644 index 0000000..dc0ab66 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricRegistry; + +public class MetricsSystem extends MetricRegistry { + public static final MetricsSystem Metrics = new MetricsSystem(); + private static final Logger logger = LoggerFactory.getLogger(MetricsSystem.class); + private final ConcurrentHashMap<String, ActiveReservoir> activeReservoirs; + + private MetricsSystem() { + activeReservoirs = new ConcurrentHashMap<>(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + logger.info("Closing Metrics System"); + try { + shutdown(); + } catch (IOException e) { + logger.error("error during shutdown activeReservoirs and listeners", e); + } + logger.info("Closed Metrics System"); + } + }); + } + + public void shutdown() throws IOException { + for (ActiveReservoir entry : activeReservoirs.values()) { + entry.close(); + } + } + + public ActiveReservoir activeReservoir(String name) { + return getOrAddActiveReservoir(name); + } + + public ActiveReservoir register(String name, ActiveReservoir activeReservoir) { + if (name == null || activeReservoir == null) { + throw new IllegalArgumentException("neither of name or ActiveReservoir can be null"); + } + final ActiveReservoir existingReservoir = activeReservoirs.putIfAbsent(name, activeReservoir); + if (existingReservoir == null) { + onActiveReservoirAdded(activeReservoir); + } else { + throw new IllegalArgumentException("An active reservoir named " + name + " already exists"); + } + + return activeReservoir; + } + + /** + * Removes the active reservoir with the given name. + * + * @param name the name of the active reservoir + * @return whether or not the active reservoir was removed + */ + public boolean removeActiveReservoir(String name) { + final ActiveReservoir recordReservoir = activeReservoirs.remove(name); + if (recordReservoir != null) { + onActiveReservoirRemoved(recordReservoir); + return true; + } + return false; + } + + /** + * Removes all active reservoirs which match the given filter. + * + * @param filter a filter + */ + public void removeActiveReservoirMatching(ActiveReservoirFilter filter) { + for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + removeActiveReservoir(entry.getKey()); + } + } + } + + private void onActiveReservoirAdded(ActiveReservoir activeReservoir) { + activeReservoir.start(); + } + + private void onActiveReservoirRemoved(ActiveReservoir activeReservoir) { + try { + activeReservoir.close(); + } catch (IOException e) { + } + } + + /** + * Returns a map of all the active reservoirs in the metrics system and their names. + * + * @return all the active reservoirs in the metrics system + */ + public SortedMap<String, ActiveReservoir> getActiveReservoirs() { + return getActiveReservoirs(ActiveReservoirFilter.ALL); + } + + /** + * Returns a map of all the active reservoirs in the metrics system and their names which match the given filter. + * + * @param filter the active reservoir filter to match + * @return all the active reservoirs in the metrics system + */ + public SortedMap<String, ActiveReservoir> getActiveReservoirs(ActiveReservoirFilter filter) { + final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>(); + for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) { + if (filter.matches(entry.getKey(), entry.getValue())) { + reservoirs.put(entry.getKey(), entry.getValue()); + } + } + return Collections.unmodifiableSortedMap(reservoirs); + } + + private ActiveReservoir getOrAddActiveReservoir(String name) { + ActiveReservoir activeReservoir = activeReservoirs.get(name); + if (activeReservoir != null) { + return activeReservoir; + } else { + String defaultActiveReservoirClass = KylinConfig.getInstanceFromEnv() + .getMetricsActiveReservoirDefaultClass(); + try { + activeReservoir = (ActiveReservoir) Class.forName(defaultActiveReservoirClass).getConstructor() + .newInstance(); + } catch (Exception e) { + logger.warn( + "Failed to initialize the " + defaultActiveReservoirClass + ". The StubReservoir will be used"); + activeReservoir = new StubReservoir(); + } + return register(name, activeReservoir); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java new file mode 100644 index 0000000..f5bc797 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.metrics.lib.Record; + +import com.google.common.collect.Maps; + +public class RecordEvent implements Record, Map<String, Object>, Serializable { + + private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new ThreadLocal<ByteArrayOutputStream>(); + + static String localHostname; + static { + try { + InetAddress addr = InetAddress.getLocalHost(); + localHostname = addr.getHostName() + ":" + addr.getHostAddress(); + } catch (UnknownHostException e) { + localHostname = "Unknown"; + } + } + + private final Map<String, Object> backingMap; + + private RecordEvent(Map<String, Object> map) { + this.backingMap = map; + } + + public RecordEvent(String eventType) { + this(eventType, localHostname); + } + + public RecordEvent(String eventType, long time) { + this(eventType, localHostname, time); + } + + public RecordEvent(String eventType, String host) { + this(eventType, host, System.currentTimeMillis()); + } + + public RecordEvent(String eventType, String host, long time) { + this(null, eventType, host, time); + } + + /** + * + * @param map + * @param eventType mandatory with null check + * @param host mandatory without null check + * @param time mandatory with null check + */ + public RecordEvent(Map<String, Object> map, String eventType, String host, long time) { + backingMap = map != null ? map : Maps.<String, Object> newHashMap(); + setEventType(eventType); + setHost(host); + setTime(time); + } + + public String getEventType() { + return (String) get(RecordReserveKeyEnum.TYPE.toString()); + } + + private void setEventType(String eventType) { + if (eventType == null) { + throw new IllegalArgumentException("EventType cannot be null."); + } + put(RecordReserveKeyEnum.TYPE.toString(), eventType); + } + + public String getHost() { + return (String) get(RecordReserveKeyEnum.HOST.toString()); + } + + private void setHost(String host) { + put(RecordReserveKeyEnum.HOST.toString(), host); + } + + public Long getTime() { + return (Long) get(RecordReserveKeyEnum.TIME.toString()); + } + + private void setTime(Long time) { + if (time == null) { + throw new IllegalArgumentException("Time cannot be null."); + } + put(RecordReserveKeyEnum.TIME.toString(), time); + } + + public void resetTime() { + setTime(System.currentTimeMillis()); + } + + public String getID() { + return (String) get(RecordReserveKeyEnum.ID.toString()); + } + + public void setID(String id) { + put(RecordReserveKeyEnum.ID.toString(), id); + } + + @Override + public void clear() { + backingMap.clear(); + } + + @Override + public boolean containsKey(Object key) { + return backingMap.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return backingMap.containsValue(value); + } + + @Override + public Set<Entry<String, Object>> entrySet() { + return backingMap.entrySet(); + } + + @Override + public boolean equals(Object o) { + return super.equals(o) || backingMap.equals(o); + } + + @Override + public Object get(Object key) { + return backingMap.get(key); + } + + @Override + public int hashCode() { + return backingMap.hashCode(); + } + + @Override + public boolean isEmpty() { + return backingMap.isEmpty(); + } + + @Override + public Set<String> keySet() { + return backingMap.keySet(); + } + + @Override + public Object put(String key, Object value) { + return backingMap.put(key, value); + } + + @Override + public void putAll(Map<? extends String, ? extends Object> t) { + backingMap.putAll(t); + } + + @Override + public Object remove(Object key) { + return backingMap.remove(key); + } + + @Override + public int size() { + return backingMap.size(); + } + + @Override + public String toString() { + return backingMap.toString(); + } + + @Override + public Collection<Object> values() { + return backingMap.values(); + } + + @Override + public String getType() { + return getEventType(); + } + + @Override + public byte[] getKey() { + return (getHost() + "-" + getTime() + "-" + getID()).getBytes(); + } + + @Override + /** + * Event type and time does not belong to value part + */ + public Map<String, Object> getValueRaw() { + Map<String, Object> cloneMap = Maps.newHashMap(backingMap); + cloneMap.remove(RecordReserveKeyEnum.TYPE.toString()); + return cloneMap; + } + + @Override + /** + * Event type does not belong to value part, it's for classification + */ + public byte[] getValue() { + try { + ByteArrayOutputStream baos = _localBaos.get(); + if (baos == null) { + baos = new ByteArrayOutputStream(); + _localBaos.set(baos); + } + baos.reset(); + JsonUtil.writeValue(baos, getValueRaw()); + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e);//in mem, should not happen + } + } + + @Override + public RecordEvent clone() { + Map<String, Object> cloneMap = Maps.newHashMap(); + cloneMap.putAll(backingMap); + return new RecordEvent(cloneMap); + } + + public enum RecordReserveKeyEnum { + TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP"); + + private final String reserveKey; + + private RecordReserveKeyEnum(String key) { + this.reserveKey = key; + } + + @Override + public String toString() { + return reserveKey; + } + + public RecordReserveKeyEnum getByKey(String key) { + for (RecordReserveKeyEnum reserveKey : RecordReserveKeyEnum.values()) { + if (reserveKey.reserveKey == key) { + return reserveKey; + } + } + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java new file mode 100644 index 0000000..ff97b9b --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.TimeZone; + +import org.apache.kylin.common.KylinConfig; + +public class RecordEventTimeDetail { + private static final TimeZone timeZone; + private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>(); + private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new ThreadLocal<SimpleDateFormat>(); + + static { + timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()); + } + + public final String year_begin_date; + public final String month_begin_date; + public final String date; + public final String time; + public final int hour; + public final int minute; + public final int second; + public final String week_begin_date; + + public RecordEventTimeDetail(long timeStamp) { + Calendar calendar = Calendar.getInstance(timeZone); + calendar.setTimeInMillis(timeStamp); + + SimpleDateFormat dateFormat = dateFormatThreadLocal.get(); + if (dateFormat == null) { + dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + dateFormat.setTimeZone(timeZone); + dateFormatThreadLocal.set(dateFormat); + } + SimpleDateFormat timeFormat = timeFormatThreadLocal.get(); + if (timeFormat == null) { + timeFormat = new SimpleDateFormat("HH:mm:ss"); + timeFormat.setTimeZone(timeZone); + timeFormatThreadLocal.set(timeFormat); + } + + String yearStr = String.format("%04d", calendar.get(Calendar.YEAR)); + String monthStr = String.format("%02d", calendar.get(Calendar.MONTH) + 1); + this.year_begin_date = yearStr + "-01-01"; + this.month_begin_date = yearStr + "-" + monthStr + "-01"; + this.date = dateFormat.format(calendar.getTime()); + this.time = timeFormat.format(calendar.getTime()); + this.hour = calendar.get(Calendar.HOUR_OF_DAY); + this.minute = calendar.get(Calendar.MINUTE); + this.second = calendar.get(Calendar.SECOND); + + long timeStampForWeekBegin = timeStamp; + timeStampForWeekBegin -= 3600000 * 24 * (calendar.get(Calendar.DAY_OF_WEEK) - 1); + calendar.setTimeInMillis(timeStampForWeekBegin); + this.week_begin_date = dateFormat.format(calendar.getTime()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java new file mode 100644 index 0000000..7031129 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.io.Serializable; + +import org.apache.kylin.metrics.lib.Record; + +public class RecordEventWrapper implements Serializable { + + protected final RecordEvent metricsEvent; + + public RecordEventWrapper(RecordEvent metricsEvent) { + this.metricsEvent = metricsEvent; + + //Add time details + addTimeDetails(); + } + + private void addTimeDetails() { + RecordEventTimeDetail dateDetail = new RecordEventTimeDetail(metricsEvent.getTime()); + metricsEvent.put(TimePropertyEnum.YEAR.toString(), dateDetail.year_begin_date); + metricsEvent.put(TimePropertyEnum.MONTH.toString(), dateDetail.month_begin_date); + metricsEvent.put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), dateDetail.week_begin_date); + metricsEvent.put(TimePropertyEnum.DAY_DATE.toString(), dateDetail.date); + metricsEvent.put(TimePropertyEnum.DAY_TIME.toString(), dateDetail.time); + metricsEvent.put(TimePropertyEnum.TIME_HOUR.toString(), dateDetail.hour); + metricsEvent.put(TimePropertyEnum.TIME_MINUTE.toString(), dateDetail.minute); + metricsEvent.put(TimePropertyEnum.TIME_SECOND.toString(), dateDetail.second); + } + + public void resetTime() { + metricsEvent.resetTime(); + addTimeDetails(); + } + + public Record getMetricsRecord() { + return metricsEvent; + } + + @Override + public String toString() { + return metricsEvent.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java new file mode 100644 index 0000000..22fadd3 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.util.Properties; + +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; + +public abstract class ReporterBuilder { + protected final ActiveReservoir registry; + protected final Properties props; + + protected ReporterBuilder(ActiveReservoir activeReservoir) { + this.registry = activeReservoir; + this.props = new Properties(); + } + + public ReporterBuilder setConfig(Properties props) { + if (props != null) { + this.props.putAll(props); + } + return this; + } + + /** + * Builds a {@link ActiveReservoirReporter} with the given properties. + * + * @return a {@link ActiveReservoirReporter} + */ + public abstract ActiveReservoirReporter build() throws Exception; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java new file mode 100644 index 0000000..fe69dec --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.Record; + +public class StubReservoir implements ActiveReservoir { + + public void addListener(ActiveReservoirListener listener) { + } + + public void removeListener(ActiveReservoirListener listener) { + } + + public void removeAllListener() { + } + + public void setHAListener(ActiveReservoirListener listener) { + } + + public void update(Record record) { + } + + public int size() { + return 0; + } + + public void start() { + } + + public void stop() { + } + + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java new file mode 100644 index 0000000..5e0e637 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import java.util.List; + +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.Record; + +public class StubReservoirReporter extends ActiveReservoirReporter { + + public static final String STUB_REPORTER_SUFFIX = "STUB"; + + public final StubReservoirListener listener = new StubReservoirListener(); + + public void start() { + } + + public void stop() { + } + + public void close() { + } + + private class StubReservoirListener implements ActiveReservoirListener { + + public boolean onRecordUpdate(final List<Record> records) { + return true; + } + + public void close() { + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java new file mode 100644 index 0000000..676c59c --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX; +import static org.apache.kylin.metrics.lib.impl.StubReservoirReporter.STUB_REPORTER_SUFFIX; + +import org.apache.kylin.metrics.lib.Sink; + +public class StubSink implements Sink { + public String getTableFromSubject(String subject) { + return KYLIN_PREFIX + "." + STUB_REPORTER_SUFFIX + "_" + subject; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java ---------------------------------------------------------------------- diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java new file mode 100644 index 0000000..1336843 --- /dev/null +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl; + +import com.google.common.base.Strings; + +public enum TimePropertyEnum { + YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE( + "KDAY_DATE"), DAY_TIME( + "KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), TIME_MINUTE("KTIME_MINUTE"), TIME_SECOND("KTIME_SECOND"); + + private final String propertyName; + + TimePropertyEnum(String propertyName) { + this.propertyName = propertyName; + } + + public static TimePropertyEnum getByPropertyName(String propertyName) { + if (Strings.isNullOrEmpty(propertyName)) { + return null; + } + for (TimePropertyEnum property : TimePropertyEnum.values()) { + if (property.propertyName.equals(propertyName.toUpperCase())) { + return property; + } + } + return null; + } + + public String toString() { + return propertyName; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/metrics-reporter-hive/pom.xml ---------------------------------------------------------------------- diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml new file mode 100644 index 0000000..e25636e --- /dev/null +++ b/metrics-reporter-hive/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-metrics-reporter-hive</artifactId> + <packaging>jar</packaging> + <name>Apache Kylin - Metrics Reporter Hive</name> + <description>Apache Kylin - Metrics Reporter Hive</description> + + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>2.3.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-metrics</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java new file mode 100644 index 0000000..26a81e3 --- /dev/null +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; +import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class HiveProducer { + + private static final Logger logger = LoggerFactory.getLogger(HiveProducer.class); + + private static final int CACHE_MAX_SIZE = 10; + + private final HiveConf hiveConf; + private final FileSystem hdfs; + private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache; + private final String CONTENT_FILE_NAME; + + public HiveProducer(Properties props) throws Exception { + this(props, new HiveConf()); + } + + HiveProducer(Properties props, HiveConf hiveConfig) throws Exception { + hiveConf = hiveConfig; + for (Map.Entry<Object, Object> e : props.entrySet()) { + hiveConf.set(e.getKey().toString(), e.getValue().toString()); + } + + hdfs = FileSystem.get(hiveConf); + + tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() { + @Override + public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, List<FieldSchema>>> notification) { + logger.info("Field schema with table " + ActiveReservoirReporter.getTableName(notification.getKey()) + " is removed due to " + notification.getCause()); + } + }).maximumSize(CACHE_MAX_SIZE).build(new CacheLoader<Pair<String, String>, Pair<String, List<FieldSchema>>>() { + @Override + public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception { + HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); + String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()).getSd().getLocation(); + List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond()); + metaStoreClient.close(); + return new Pair(tableLocation, fields); + } + }); + + String hostName; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostName = "UNKNOWN"; + } + CONTENT_FILE_NAME = hostName + "-part-0000"; + } + + public void close() { + tableFieldSchemaCache.cleanUp(); + } + + public void send(final Record record) throws Exception { + HiveProducerRecord hiveRecord = convertTo(record); + write(hiveRecord.key(), Lists.newArrayList(hiveRecord)); + } + + public void send(final List<Record> recordList) throws Exception { + Map<RecordKey, List<HiveProducerRecord>> recordMap = Maps.newHashMap(); + for (Record record : recordList) { + HiveProducerRecord hiveRecord = convertTo(record); + if (recordMap.get(hiveRecord.key()) == null) { + recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList()); + } + recordMap.get(hiveRecord.key()).add(hiveRecord); + } + + for (Map.Entry<RecordKey, List<HiveProducerRecord>> entry : recordMap.entrySet()) { + write(entry.getKey(), entry.getValue()); + } + } + + private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception { + String tableLocation = tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())).getFirst(); + StringBuilder sb = new StringBuilder(); + sb.append(tableLocation); + for (Map.Entry<String, String> e : recordKey.partition().entrySet()) { + sb.append("/"); + sb.append(e.getKey().toLowerCase()); + sb.append("="); + sb.append(e.getValue()); + } + Path partitionPath = new Path(sb.toString()); + if (!hdfs.exists(partitionPath)) { + StringBuilder hql = new StringBuilder(); + hql.append("ALTER TABLE "); + hql.append(recordKey.database() + "." + recordKey.table()); + hql.append(" ADD IF NOT EXISTS PARTITION ("); + boolean ifFirst = true; + for (Map.Entry<String, String> e : recordKey.partition().entrySet()) { + if (ifFirst) { + ifFirst = false; + } else { + hql.append(","); + } + hql.append(e.getKey().toLowerCase()); + hql.append("='" + e.getValue() + "'"); + } + hql.append(")"); + Driver driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + driver.run(hql.toString()); + driver.close(); + } + Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME); + if (!hdfs.exists(partitionContentPath)) { + int nRetry = 0; + while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) { + if (hdfs.exists(partitionContentPath)) { + break; + } + Thread.sleep(500L * nRetry); + } + if (!hdfs.exists(partitionContentPath)) { + throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries"); + } + } + try (FSDataOutputStream fout = hdfs.append(partitionContentPath)) { + for (HiveProducerRecord elem : recordItr) { + fout.writeBytes(elem.valueToString() + "\n"); + } + } catch (IOException e) { + System.out.println("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e); + logger.error("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e); + } + } + + private HiveProducerRecord convertTo(Record record) throws Exception { + Map<String, Object> rawValue = record.getValueRaw(); + + //Set partition values for hive table + Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1); + partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString()); + + return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()), partitionKVs, rawValue); + } + + public HiveProducerRecord parseToHiveProducerRecord(String tableName, Map<String, String> partitionKVs, Map<String, Object> rawValue) throws Exception { + Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableName); + List<FieldSchema> fields = tableFieldSchemaCache.get(tableNameSplits).getSecond(); + List<Object> columnValues = Lists.newArrayListWithExpectedSize(fields.size()); + for (FieldSchema fieldSchema : fields) { + columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase())); + } + + return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2b32aa4c/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java new file mode 100644 index 0000000..8bf93ec --- /dev/null +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.Strings; + +public class HiveProducerRecord { + + public static final String DELIMITER = ","; + + private final RecordKey key; + private final List<Object> value; + + public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) { + this.key = new RecordKey(dbName, tableName, partitionKVs); + this.value = value; + } + + public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) { + this.key = new RecordKey(tableName, partitionKVs); + this.value = value; + } + + public HiveProducerRecord(String dbName, String tableName, List<Object> value) { + this.key = new RecordKey(dbName, tableName); + this.value = value; + } + + public HiveProducerRecord(String tableName, List<Object> value) { + this.key = new RecordKey(tableName); + this.value = value; + } + + public RecordKey key() { + return this.key; + } + + public List<Object> value() { + return this.value; + } + + public String toString() { + String value = this.value == null ? "null" : this.value.toString(); + return "HiveProducerRecord(key=" + this.key.toString() + ", value=" + value + ")"; + } + + public String valueToString() { + if (this.value == null || value.isEmpty()) { + return null; + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < value.size() - 1; i++) { + sb.append(value.get(i) + DELIMITER); + } + sb.append(value.get(value.size() - 1)); + return sb.toString(); + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof HiveProducerRecord)) { + return false; + } else { + HiveProducerRecord that = (HiveProducerRecord) o; + if (this.key != null) { + if (!this.key.equals(that.key)) { + return false; + } + } else if (that.key != null) { + return false; + } + if (this.value != null) { + if (!this.value.equals(that.value)) { + return false; + } + } else if (that.value != null) { + return false; + } + } + return true; + } + + public int hashCode() { + int result = this.key != null ? this.key.hashCode() : 0; + result = 31 * result + (this.value != null ? this.value.hashCode() : 0); + return result; + } + + public class RecordKey { + public static final String DEFAULT_DB_NAME = "DEFAULT"; + + private final String dbName; + private final String tableName; + private final Map<String, String> partitionKVs; + + public RecordKey(String dbName, String tableName, Map<String, String> partitionKVs) { + if (Strings.isNullOrEmpty(dbName)) { + this.dbName = DEFAULT_DB_NAME; + } else { + this.dbName = dbName; + } + this.tableName = tableName; + this.partitionKVs = partitionKVs; + } + + public RecordKey(String tableName, Map<String, String> partitionKVs) { + this(null, tableName, partitionKVs); + } + + public RecordKey(String dbName, String tableName) { + this(dbName, tableName, null); + } + + public RecordKey(String tableName) { + this(null, tableName, null); + } + + public String database() { + return this.dbName; + } + + public String table() { + return this.tableName; + } + + public Map<String, String> partition() { + return this.partitionKVs; + } + + public String toString() { + String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString(); + return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")"; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof RecordKey)) { + return false; + } else { + RecordKey that = (RecordKey) o; + if (this.dbName != null) { + if (!this.dbName.equals(that.dbName)) { + return false; + } + } else if (that.dbName != null) { + return false; + } + + if (this.tableName != null) { + if (!this.tableName.equals(that.tableName)) { + return false; + } + } else if (that.tableName != null) { + return false; + } + + if (this.partitionKVs != null) { + if (!this.partitionKVs.equals(that.partitionKVs)) { + return false; + } + } else if (that.partitionKVs != null) { + return false; + } + } + return true; + } + + public int hashCode() { + int result = this.dbName != null ? this.dbName.hashCode() : 0; + result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0); + result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0); + return result; + } + } +}