This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fd9c58a Adding NoopPinotMetricFactory and corresponding changes (#8270) fd9c58a is described below commit fd9c58a11ed16d27109baefcee138eea30132ad3 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue Mar 1 23:37:41 2022 -0800 Adding NoopPinotMetricFactory and corresponding changes (#8270) --- .../presto/PinotScatterGatherQueryClient.java | 49 ++-- .../presto/grpc/PinotStreamingQueryClient.java | 13 - .../plugin/metrics/NoopPinotMetricFactory.java | 292 +++++++++++++++++++++ 3 files changed, 317 insertions(+), 37 deletions(-) diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java index 8846be4..6cb45ec 100644 --- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java +++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.connector.presto; +import com.google.common.collect.ImmutableMap; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -35,15 +36,19 @@ import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.connector.presto.plugin.metrics.NoopPinotMetricFactory; import org.apache.pinot.core.transport.AsyncQueryResponse; import org.apache.pinot.core.transport.QueryRouter; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_METRICS_FACTORY_CLASS_NAME; + public class PinotScatterGatherQueryClient { private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); @@ -72,8 +77,7 @@ public class PinotScatterGatherQueryClient { } } - public static class PinotException - extends RuntimeException { + public static class PinotException extends RuntimeException { private final ErrorCode _errorCode; public PinotException(ErrorCode errorCode, String message, Throwable t) { @@ -188,6 +192,8 @@ public class PinotScatterGatherQueryClient { public PinotScatterGatherQueryClient(Config pinotConfig) { _prestoHostId = getDefaultPrestoId(); + PinotMetricUtils.init(new PinotConfiguration( + ImmutableMap.of(CONFIG_OF_METRICS_FACTORY_CLASS_NAME, NoopPinotMetricFactory.class.getName()))); _brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); _brokerMetrics.initializeGlobalMeters(); TlsConfig tlsConfig = getTlsConfig(pinotConfig); @@ -240,13 +246,8 @@ public class PinotScatterGatherQueryClient { return defaultBrokerId; } - public Map<ServerInstance, DataTable> queryPinotServerForDataTable( - String pql, - String serverHost, - List<String> segments, - long connectionTimeoutInMillis, - boolean ignoreEmptyResponses, - int pinotRetryCount) { + public Map<ServerInstance, DataTable> queryPinotServerForDataTable(String pql, String serverHost, + List<String> segments, long connectionTimeoutInMillis, boolean ignoreEmptyResponses, int pinotRetryCount) { BrokerRequest brokerRequest; try { brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql); @@ -260,7 +261,7 @@ public class PinotScatterGatherQueryClient { new ArrayList<>(segments)); // Unfortunately the retries will all hit the same server because the routing decision has already been made by - // the pinot broker + // the pinot broker Map<ServerInstance, DataTable> serverResponseMap = doWithRetries(pinotRetryCount, (requestId) -> { String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName()); if (!_concurrentQueriesCountMap.containsKey(serverHost)) { @@ -276,17 +277,17 @@ public class PinotScatterGatherQueryClient { QueryRouter nextAvailableQueryRouter = getNextAvailableQueryRouter(); if (TableNameBuilder.getTableTypeFromTableName(brokerRequest.getQuerySource().getTableName()) == TableType.REALTIME) { - asyncQueryResponse = nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, - routingTable, connectionTimeoutInMillis); + asyncQueryResponse = + nextAvailableQueryRouter.submitQuery(requestId, rawTableName, null, null, brokerRequest, routingTable, + connectionTimeoutInMillis); } else { - asyncQueryResponse = nextAvailableQueryRouter - .submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, connectionTimeoutInMillis); + asyncQueryResponse = + nextAvailableQueryRouter.submitQuery(requestId, rawTableName, brokerRequest, routingTable, null, null, + connectionTimeoutInMillis); } - Map<ServerInstance, DataTable> serverInstanceDataTableMap = gatherServerResponses( - ignoreEmptyResponses, - routingTable, - asyncQueryResponse, - brokerRequest.getQuerySource().getTableName()); + Map<ServerInstance, DataTable> serverInstanceDataTableMap = + gatherServerResponses(ignoreEmptyResponses, routingTable, asyncQueryResponse, + brokerRequest.getQuerySource().getTableName()); _queryRouters.offer(nextAvailableQueryRouter); _concurrentQueriesCountMap.get(serverHost).decrementAndGet(); return serverInstanceDataTableMap; @@ -320,15 +321,15 @@ public class PinotScatterGatherQueryClient { : entry.getValue().toString(); routingTableForLogging.put(entry.getKey().toString(), valueToPrint); }); - throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String - .format("%d of %d servers responded with routing table servers: %s, query stats: %s", + throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, + String.format("%d of %d servers responded with routing table servers: %s, query stats: %s", queryResponses.size(), routingTable.size(), routingTableForLogging, asyncQueryResponse.getStats())); } } Map<ServerInstance, DataTable> serverResponseMap = new HashMap<>(); - queryResponses.entrySet().forEach(entry -> serverResponseMap.put( - new ServerInstance(new InstanceConfig( - String.format("Server_%s_%d", entry.getKey().getHostname(), entry.getKey().getPort()))), + queryResponses.entrySet().forEach(entry -> serverResponseMap.put(new ServerInstance( + new InstanceConfig(String.format("Server_%s_%d", entry.getKey().getHostname(), + entry.getKey().getPort()))), entry.getValue().getDataTable())); return serverResponseMap; } catch (InterruptedException e) { diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java index cbdb820..a5658bb 100644 --- a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java +++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/grpc/PinotStreamingQueryClient.java @@ -16,19 +16,6 @@ * specific language governing permissions and limitations * under the License. */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.pinot.connector.presto.grpc; import java.util.HashMap; diff --git a/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java new file mode 100644 index 0000000..b789913 --- /dev/null +++ b/pinot-connectors/prestodb-pinot-dependencies/presto-pinot-driver/src/main/java/org/apache/pinot/connector/presto/plugin/metrics/NoopPinotMetricFactory.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.presto.plugin.metrics; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.pinot.spi.annotations.metrics.MetricsFactory; +import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.PinotCounter; +import org.apache.pinot.spi.metrics.PinotGauge; +import org.apache.pinot.spi.metrics.PinotHistogram; +import org.apache.pinot.spi.metrics.PinotJmxReporter; +import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.metrics.PinotMetric; +import org.apache.pinot.spi.metrics.PinotMetricName; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; +import org.apache.pinot.spi.metrics.PinotMetricsRegistryListener; +import org.apache.pinot.spi.metrics.PinotTimer; + + +/** + * This package name has to match the regex pattern: ".*\\.plugin\\.metrics\\..*" + */ +@MetricsFactory +public class NoopPinotMetricFactory implements PinotMetricsFactory { + private PinotMetricsRegistry _pinotMetricsRegistry; + + @Override + public void init(PinotConfiguration pinotConfiguration) { + } + + @Override + public PinotMetricsRegistry getPinotMetricsRegistry() { + if (_pinotMetricsRegistry == null) { + _pinotMetricsRegistry = new NoopPinotMetricsRegistry(); + } + return _pinotMetricsRegistry; + } + + @Override + public PinotMetricName makePinotMetricName(Class<?> aClass, String s) { + return new NoopPinotMetricName(); + } + + @Override + public <T> PinotGauge<T> makePinotGauge(Function<Void, T> function) { + return new NoopPinotGauge<T>(); + } + + @Override + public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry pinotMetricsRegistry) { + return new NoopPinotJmxReporter(); + } + + @Override + public String getMetricsFactoryName() { + return "noop"; + } + + public static class NoopPinotMetricsRegistry implements PinotMetricsRegistry { + @Override + public void removeMetric(PinotMetricName pinotMetricName) { + } + + @Override + public <T> PinotGauge<T> newGauge(PinotMetricName pinotMetricName, PinotGauge<T> pinotGauge) { + return new NoopPinotGauge<T>(); + } + + @Override + public PinotMeter newMeter(PinotMetricName pinotMetricName, String s, TimeUnit timeUnit) { + return new NoopPinotMeter(); + } + + @Override + public PinotCounter newCounter(PinotMetricName pinotMetricName) { + return new NoopPinotCounter(); + } + + @Override + public PinotTimer newTimer(PinotMetricName pinotMetricName, TimeUnit timeUnit, TimeUnit timeUnit1) { + return new NoopPinotTimer(); + } + + @Override + public PinotHistogram newHistogram(PinotMetricName pinotMetricName, boolean b) { + return new NoopPinotHistogram(); + } + + @Override + public Map<PinotMetricName, PinotMetric> allMetrics() { + return ImmutableMap.of(); + } + + @Override + public void addListener(PinotMetricsRegistryListener pinotMetricsRegistryListener) { + } + + @Override + public Object getMetricsRegistry() { + return this; + } + + @Override + public void shutdown() { + } + } + + private static class NoopPinotJmxReporter implements PinotJmxReporter { + @Override + public void start() { + } + } + + private static class NoopPinotMeter implements PinotMeter { + @Override + public void mark() { + } + + @Override + public void mark(long l) { + } + + @Override + public Object getMetered() { + return null; + } + + @Override + public TimeUnit rateUnit() { + return null; + } + + @Override + public String eventType() { + return null; + } + + @Override + public long count() { + return 0; + } + + @Override + public double fifteenMinuteRate() { + return 0; + } + + @Override + public double fiveMinuteRate() { + return 0; + } + + @Override + public double meanRate() { + return 0; + } + + @Override + public double oneMinuteRate() { + return 0; + } + + @Override + public Object getMetric() { + return null; + } + } + + private static class NoopPinotMetricName implements PinotMetricName { + @Override + public Object getMetricName() { + return null; + } + } + + private static class NoopPinotGauge<T> implements PinotGauge<T> { + @Override + public Object getGauge() { + return null; + } + + @Override + public T value() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + } + + private static class NoopPinotCounter implements PinotCounter { + @Override + public Object getCounter() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + } + + private static class NoopPinotTimer implements PinotTimer { + @Override + public Object getMetered() { + return null; + } + + @Override + public TimeUnit rateUnit() { + return null; + } + + @Override + public String eventType() { + return null; + } + + @Override + public long count() { + return 0; + } + + @Override + public double fifteenMinuteRate() { + return 0; + } + + @Override + public double fiveMinuteRate() { + return 0; + } + + @Override + public double meanRate() { + return 0; + } + + @Override + public double oneMinuteRate() { + return 0; + } + + @Override + public Object getMetric() { + return null; + } + + @Override + public void update(long duration, TimeUnit unit) { + } + + @Override + public Object getTimer() { + return null; + } + } + + private static class NoopPinotHistogram implements PinotHistogram { + @Override + public Object getHistogram() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org