This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0e7393a1ac0 Cleanup some unused code in pinot-query-runtime (#16718)
0e7393a1ac0 is described below
commit 0e7393a1ac06c57fd73377c854d2cdfa7c85d19f
Author: Yash Mayya <[email protected]>
AuthorDate: Sat Aug 30 00:28:38 2025 +0530
Cleanup some unused code in pinot-query-runtime (#16718)
---
.../requesthandler/TimeSeriesRequestHandler.java | 2 +-
.../pinot/query/mailbox/GrpcSendingMailbox.java | 4 +-
.../apache/pinot/query/mailbox/MailboxService.java | 4 +-
.../query/runtime/executor/OpChainScheduler.java | 86 ----------------------
.../runtime/operator/utils/BlockingStream.java | 47 ------------
.../query/runtime/plan/MultiStageQueryStats.java | 4 -
.../plan/server/ServerPlanRequestUtils.java | 10 ---
.../query/service/dispatch/QueryDispatcher.java | 2 +-
.../AsyncQueryTimeSeriesDispatchResponse.java | 57 --------------
.../operator/EnrichedHashJoinOperatorTest.java | 1 -
.../runtime/operator/MailboxSendOperatorTest.java | 1 -
.../query/runtime/queries/QueryRunnerTestBase.java | 6 --
.../runtime/queries/ResourceBasedQueriesTest.java | 7 --
.../testutils/MockInstanceDataManagerFactory.java | 4 -
14 files changed, 5 insertions(+), 230 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 65e04288438..8835597b151 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -135,7 +135,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
tableLevelAccessControlCheck(httpHeaders,
dispatchablePlan.getTableNames());
timeSeriesBlock =
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
- timeSeriesRequest.getTimeout().toMillis(), new HashMap<>(),
requestContext);
+ timeSeriesRequest.getTimeout().toMillis(), requestContext);
return timeSeriesBlock;
} catch (Exception e) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
1);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index cd54d7d1ee3..cce43b228f7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -49,7 +49,6 @@ import
org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.segment.spi.memory.DataBuffer;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +72,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
private StreamObserver<MailboxContent> _contentObserver;
- public GrpcSendingMailbox(
- PinotConfiguration config, String id, ChannelManager channelManager,
String hostname, int port, long deadlineMs,
+ public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) {
_id = id;
_channelManager = channelManager;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 92dae478321..7ab607bee02 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -139,8 +139,8 @@ public class MailboxService {
if (_hostname.equals(hostname) && _port == port) {
return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
} else {
- return new GrpcSendingMailbox(
- _config, mailboxId, _channelManager, hostname, port, deadlineMs,
statMap, _maxByteStringSize);
+ return new GrpcSendingMailbox(mailboxId, _channelManager, hostname,
port, deadlineMs, statMap,
+ _maxByteStringSize);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
deleted file mode 100644
index b19ea88263f..00000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.executor;
-
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.operator.OpChainId;
-
-
-/**
- * An interface that defines different scheduling strategies to work with the
{@link OpChainSchedulerService}.
- */
-@ThreadSafe
-public interface OpChainScheduler {
- /**
- * Registers a new OpChain with the scheduler.
- * @param operatorChain the operator chain to register
- */
- void register(OpChain operatorChain);
-
- /**
- * When the OpChain is finished, error or otherwise, deregister is called
for it so the scheduler can do any required
- * cleanup. After an OpChain is de-registered, the scheduler service will
never call any other method for it.
- * However, the {@link #onDataAvailable} callback may be called even after
an OpChain is de-registered, and the
- * scheduler should handle that scenario.
- * @param operatorChain an operator chain that is finished (error or
otherwise).
- */
- void deregister(OpChain operatorChain);
-
- /**
- * Used by {@link OpChainSchedulerService} to indicate that a given OpChain
can be suspended until it receives some
- * data. Note that this method is only used by the scheduler service to
"indicate" that an OpChain can be suspended.
- * The decision on whether to actually suspend or not can be taken by the
scheduler.
- */
- void yield(OpChain opChain);
-
- /**
- * A callback called whenever data is received for the given opChain. This
can be used by the scheduler
- * implementations to re-scheduled suspended OpChains. This method may be
called for an OpChain that has not yet
- * been scheduled, or an OpChain that has already been de-registered.
- * @param opChainId the {@link OpChain} ID
- */
- void onDataAvailable(OpChainId opChainId);
-
- /**
- * Returns an OpChain that is ready to be run by {@link
OpChainSchedulerService}, waiting for the given time if
- * there are no such OpChains ready yet. Will return null if there's no
ready OpChains even after the specified time.
- *
- * @param time non-negative value that determines the time the scheduler
will wait for new OpChains to be ready.
- * @param timeUnit TimeUnit for the await time.
- * @return a non-null OpChain that's ready to be run, or null if there's no
OpChain ready even after waiting for the
- * given time.
- * @throws InterruptedException if the wait for a ready OpChain was
interrupted.
- */
- @Nullable
- OpChain next(long time, TimeUnit timeUnit)
- throws InterruptedException;
-
- /**
- * @return the number of operator chains registered with the scheduler
- */
- int size();
-
- /**
- * TODO: Figure out shutdown flow in context of graceful shutdown.
- */
- void shutdownNow();
-}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
deleted file mode 100644
index effc5b971d0..00000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.operator.utils;
-
-/**
- * An interface that represents an abstract blocking stream of elements that
can be consumed.
- *
- * These streams are designed to be consumed by a single thread and do not
support null elements.
- *
- * @param <E> The type of the elements, usually a {@link
org.apache.pinot.query.runtime.blocks.MseBlock}
- */
-public interface BlockingStream<E> {
- /**
- * The id of the stream. Mostly used for logging.
- *
- * Implementations of this method must be thread safe.
- */
- Object getId();
-
- /**
- * Returns the next element on the stream, blocking if there is no element
ready.
- */
- E get();
-
- /**
- * Cancels the stream.
- *
- * This method can be called by any thread.
- */
- void cancel();
-}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
index 9ff54f01f55..72bbf3727a0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
@@ -471,10 +471,6 @@ public class MultiStageQueryStats {
return _operatorTypes.get(index);
}
- public MultiStageOperator.Type getLastType() {
- return _operatorTypes.get(_operatorTypes.size() - 1);
- }
-
public StatMap<?> getLastOperatorStats() {
return _operatorStats.get(_operatorStats.size() - 1);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index c83b7e4358e..670c79aa7a5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -90,16 +90,6 @@ public class ServerPlanRequestUtils {
}, false, rowFilters);
}
- public static OpChain compileLeafStage(
- OpChainExecutionContext executionContext,
- StagePlan stagePlan,
- QueryExecutor leafQueryExecutor,
- ExecutorService executorService) {
- return compileLeafStage(executionContext, stagePlan, leafQueryExecutor,
executorService,
- (planNode, multiStageOperator) -> {
- }, false, null);
- }
-
/**
* main entry point for compiling leaf-stage {@link StagePlan}.
*
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2812a7806a9..67eb92fef07 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -698,7 +698,7 @@ public class QueryDispatcher {
}
public TimeSeriesBlock submitAndGet(long requestId,
TimeSeriesDispatchablePlan plan, long timeoutMs,
- Map<String, String> queryOptions, RequestContext requestContext)
+ RequestContext requestContext)
throws Exception {
long deadlineMs = System.currentTimeMillis() + timeoutMs;
BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
deleted file mode 100644
index b00919d3bf3..00000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.service.dispatch.timeseries;
-
-import javax.annotation.Nullable;
-import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.routing.QueryServerInstance;
-
-
-/**
- * Response of the broker dispatch to the server.
- * TODO: This shouldn't exist and we should re-use AsyncQueryDispatchResponse.
TBD as part of multi-stage
- * engine integration.
- */
-public class AsyncQueryTimeSeriesDispatchResponse {
- private final QueryServerInstance _serverInstance;
- private final Worker.TimeSeriesResponse _queryResponse;
- private final Throwable _throwable;
-
- public AsyncQueryTimeSeriesDispatchResponse(QueryServerInstance
serverInstance,
- @Nullable Worker.TimeSeriesResponse queryResponse,
- @Nullable Throwable throwable) {
- _serverInstance = serverInstance;
- _queryResponse = queryResponse;
- _throwable = throwable;
- }
-
- public QueryServerInstance getServerInstance() {
- return _serverInstance;
- }
-
- @Nullable
- public Worker.TimeSeriesResponse getQueryResponse() {
- return _queryResponse;
- }
-
- @Nullable
- public Throwable getThrowable() {
- return _throwable;
- }
-}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
index f7da265910d..a3134e7163b 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
@@ -36,7 +36,6 @@ import static org.testng.Assert.assertTrue;
public class EnrichedHashJoinOperatorTest {
- private AutoCloseable _mocks;
private MultiStageOperator _leftInput;
private MultiStageOperator _rightInput;
private static final DataSchema DEFAULT_CHILD_SCHEMA = new DataSchema(new
String[]{"int_col", "string_col"},
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index ee028d7ac71..ffc98d2e12c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -98,7 +98,6 @@ public class MailboxSendOperatorTest {
// Then:
assertTrue(block.isError(), "expected error block to propagate");
- ErrorMseBlock errorMseBlock = (ErrorMseBlock) block;
ArgumentCaptor<MseBlock.Eos> captor =
ArgumentCaptor.forClass(MseBlock.Eos.class);
verify(_exchange).send(captor.capture(), anyList());
assertTrue(captor.getValue().isError(), "expected to send error block to
exchange");
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index dfb88fe10c8..c324fdda02d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -76,7 +76,6 @@ import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.h2.jdbc.JdbcArray;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -453,11 +452,6 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
protected Connection _h2Connection;
- protected Connection getH2Connection() {
- assertNotNull(_h2Connection, "H2 Connection has not been initialized");
- return _h2Connection;
- }
-
protected void setH2Connection()
throws Exception {
assertNull(_h2Connection);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 93eff5c5ac8..29ee3a0affe 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -580,11 +580,4 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
}
return testCaseMap;
}
-
- private static Object extractExtraProps(Map<String, Object> extraProps,
String propKey) {
- if (extraProps == null) {
- return null;
- }
- return extraProps.getOrDefault(propKey, null);
- }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 3e75dfb1007..f3fb219e8c6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -138,10 +138,6 @@ public class MockInstanceDataManagerFactory {
return _registeredSchemaMap;
}
- public Map<String, Schema> buildSchemaMap() {
- return _schemaMap;
- }
-
public Map<String, List<GenericRow>> buildTableRowsMap() {
return _tableRowsMap;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]