This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e7393a1ac0 Cleanup some unused code in pinot-query-runtime (#16718)
0e7393a1ac0 is described below

commit 0e7393a1ac06c57fd73377c854d2cdfa7c85d19f
Author: Yash Mayya <[email protected]>
AuthorDate: Sat Aug 30 00:28:38 2025 +0530

    Cleanup some unused code in pinot-query-runtime (#16718)
---
 .../requesthandler/TimeSeriesRequestHandler.java   |  2 +-
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  4 +-
 .../apache/pinot/query/mailbox/MailboxService.java |  4 +-
 .../query/runtime/executor/OpChainScheduler.java   | 86 ----------------------
 .../runtime/operator/utils/BlockingStream.java     | 47 ------------
 .../query/runtime/plan/MultiStageQueryStats.java   |  4 -
 .../plan/server/ServerPlanRequestUtils.java        | 10 ---
 .../query/service/dispatch/QueryDispatcher.java    |  2 +-
 .../AsyncQueryTimeSeriesDispatchResponse.java      | 57 --------------
 .../operator/EnrichedHashJoinOperatorTest.java     |  1 -
 .../runtime/operator/MailboxSendOperatorTest.java  |  1 -
 .../query/runtime/queries/QueryRunnerTestBase.java |  6 --
 .../runtime/queries/ResourceBasedQueriesTest.java  |  7 --
 .../testutils/MockInstanceDataManagerFactory.java  |  4 -
 14 files changed, 5 insertions(+), 230 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 65e04288438..8835597b151 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -135,7 +135,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
 
       tableLevelAccessControlCheck(httpHeaders, 
dispatchablePlan.getTableNames());
       timeSeriesBlock = 
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
-          timeSeriesRequest.getTimeout().toMillis(), new HashMap<>(), 
requestContext);
+          timeSeriesRequest.getTimeout().toMillis(), requestContext);
       return timeSeriesBlock;
     } catch (Exception e) {
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
 1);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index cd54d7d1ee3..cce43b228f7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -49,7 +49,6 @@ import 
org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,8 +72,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
   private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(
-      PinotConfiguration config, String id, ChannelManager channelManager, 
String hostname, int port, long deadlineMs,
+  public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,
       StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) {
     _id = id;
     _channelManager = channelManager;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 92dae478321..7ab607bee02 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -139,8 +139,8 @@ public class MailboxService {
     if (_hostname.equals(hostname) && _port == port) {
       return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
     } else {
-      return new GrpcSendingMailbox(
-          _config, mailboxId, _channelManager, hostname, port, deadlineMs, 
statMap, _maxByteStringSize);
+      return new GrpcSendingMailbox(mailboxId, _channelManager, hostname, 
port, deadlineMs, statMap,
+          _maxByteStringSize);
     }
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
deleted file mode 100644
index b19ea88263f..00000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.executor;
-
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.operator.OpChainId;
-
-
-/**
- * An interface that defines different scheduling strategies to work with the 
{@link OpChainSchedulerService}.
- */
-@ThreadSafe
-public interface OpChainScheduler {
-  /**
-   * Registers a new OpChain with the scheduler.
-   * @param operatorChain the operator chain to register
-   */
-  void register(OpChain operatorChain);
-
-  /**
-   * When the OpChain is finished, error or otherwise, deregister is called 
for it so the scheduler can do any required
-   * cleanup. After an OpChain is de-registered, the scheduler service will 
never call any other method for it.
-   * However, the {@link #onDataAvailable} callback may be called even after 
an OpChain is de-registered, and the
-   * scheduler should handle that scenario.
-   * @param operatorChain an operator chain that is finished (error or 
otherwise).
-   */
-  void deregister(OpChain operatorChain);
-
-  /**
-   * Used by {@link OpChainSchedulerService} to indicate that a given OpChain 
can be suspended until it receives some
-   * data. Note that this method is only used by the scheduler service to 
"indicate" that an OpChain can be suspended.
-   * The decision on whether to actually suspend or not can be taken by the 
scheduler.
-   */
-  void yield(OpChain opChain);
-
-  /**
-   * A callback called whenever data is received for the given opChain. This 
can be used by the scheduler
-   * implementations to re-scheduled suspended OpChains. This method may be 
called for an OpChain that has not yet
-   * been scheduled, or an OpChain that has already been de-registered.
-   * @param opChainId the {@link OpChain} ID
-   */
-  void onDataAvailable(OpChainId opChainId);
-
-  /**
-   * Returns an OpChain that is ready to be run by {@link 
OpChainSchedulerService}, waiting for the given time if
-   * there are no such OpChains ready yet. Will return null if there's no 
ready OpChains even after the specified time.
-   *
-   * @param time non-negative value that determines the time the scheduler 
will wait for new OpChains to be ready.
-   * @param timeUnit TimeUnit for the await time.
-   * @return a non-null OpChain that's ready to be run, or null if there's no 
OpChain ready even after waiting for the
-   *         given time.
-   * @throws InterruptedException if the wait for a ready OpChain was 
interrupted.
-   */
-  @Nullable
-  OpChain next(long time, TimeUnit timeUnit)
-      throws InterruptedException;
-
-  /**
-   * @return the number of operator chains registered with the scheduler
-   */
-  int size();
-
-  /**
-   * TODO: Figure out shutdown flow in context of graceful shutdown.
-   */
-  void shutdownNow();
-}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
deleted file mode 100644
index effc5b971d0..00000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingStream.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.operator.utils;
-
-/**
- * An interface that represents an abstract blocking stream of elements that 
can be consumed.
- *
- * These streams are designed to be consumed by a single thread and do not 
support null elements.
- *
- * @param <E> The type of the elements, usually a {@link 
org.apache.pinot.query.runtime.blocks.MseBlock}
- */
-public interface BlockingStream<E> {
-  /**
-   * The id of the stream. Mostly used for logging.
-   *
-   * Implementations of this method must be thread safe.
-   */
-  Object getId();
-
-  /**
-   * Returns the next element on the stream, blocking if there is no element 
ready.
-   */
-  E get();
-
-  /**
-   * Cancels the stream.
-   *
-   * This method can be called by any thread.
-   */
-  void cancel();
-}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
index 9ff54f01f55..72bbf3727a0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
@@ -471,10 +471,6 @@ public class MultiStageQueryStats {
       return _operatorTypes.get(index);
     }
 
-    public MultiStageOperator.Type getLastType() {
-      return _operatorTypes.get(_operatorTypes.size() - 1);
-    }
-
     public StatMap<?> getLastOperatorStats() {
       return _operatorStats.get(_operatorStats.size() - 1);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index c83b7e4358e..670c79aa7a5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -90,16 +90,6 @@ public class ServerPlanRequestUtils {
         }, false, rowFilters);
   }
 
-  public static OpChain compileLeafStage(
-      OpChainExecutionContext executionContext,
-      StagePlan stagePlan,
-      QueryExecutor leafQueryExecutor,
-      ExecutorService executorService) {
-    return compileLeafStage(executionContext, stagePlan, leafQueryExecutor, 
executorService,
-        (planNode, multiStageOperator) -> {
-        }, false, null);
-  }
-
   /**
    * main entry point for compiling leaf-stage {@link StagePlan}.
    *
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2812a7806a9..67eb92fef07 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -698,7 +698,7 @@ public class QueryDispatcher {
   }
 
   public TimeSeriesBlock submitAndGet(long requestId, 
TimeSeriesDispatchablePlan plan, long timeoutMs,
-      Map<String, String> queryOptions, RequestContext requestContext)
+      RequestContext requestContext)
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
     BaseTimeSeriesPlanNode brokerFragment = plan.getBrokerFragment();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
deleted file mode 100644
index b00919d3bf3..00000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/AsyncQueryTimeSeriesDispatchResponse.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.service.dispatch.timeseries;
-
-import javax.annotation.Nullable;
-import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.routing.QueryServerInstance;
-
-
-/**
- * Response of the broker dispatch to the server.
- * TODO: This shouldn't exist and we should re-use AsyncQueryDispatchResponse. 
TBD as part of multi-stage
- *   engine integration.
- */
-public class AsyncQueryTimeSeriesDispatchResponse {
-  private final QueryServerInstance _serverInstance;
-  private final Worker.TimeSeriesResponse _queryResponse;
-  private final Throwable _throwable;
-
-  public AsyncQueryTimeSeriesDispatchResponse(QueryServerInstance 
serverInstance,
-      @Nullable Worker.TimeSeriesResponse queryResponse,
-      @Nullable Throwable throwable) {
-    _serverInstance = serverInstance;
-    _queryResponse = queryResponse;
-    _throwable = throwable;
-  }
-
-  public QueryServerInstance getServerInstance() {
-    return _serverInstance;
-  }
-
-  @Nullable
-  public Worker.TimeSeriesResponse getQueryResponse() {
-    return _queryResponse;
-  }
-
-  @Nullable
-  public Throwable getThrowable() {
-    return _throwable;
-  }
-}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
index f7da265910d..a3134e7163b 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/EnrichedHashJoinOperatorTest.java
@@ -36,7 +36,6 @@ import static org.testng.Assert.assertTrue;
 
 
 public class EnrichedHashJoinOperatorTest {
-  private AutoCloseable _mocks;
   private MultiStageOperator _leftInput;
   private MultiStageOperator _rightInput;
   private static final DataSchema DEFAULT_CHILD_SCHEMA = new DataSchema(new 
String[]{"int_col", "string_col"},
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index ee028d7ac71..ffc98d2e12c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -98,7 +98,6 @@ public class MailboxSendOperatorTest {
 
     // Then:
     assertTrue(block.isError(), "expected error block to propagate");
-    ErrorMseBlock errorMseBlock = (ErrorMseBlock) block;
     ArgumentCaptor<MseBlock.Eos> captor = 
ArgumentCaptor.forClass(MseBlock.Eos.class);
     verify(_exchange).send(captor.capture(), anyList());
     assertTrue(captor.getValue().isError(), "expected to send error block to 
exchange");
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index dfb88fe10c8..c324fdda02d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -76,7 +76,6 @@ import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.h2.jdbc.JdbcArray;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
@@ -453,11 +452,6 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
 
   protected Connection _h2Connection;
 
-  protected Connection getH2Connection() {
-    assertNotNull(_h2Connection, "H2 Connection has not been initialized");
-    return _h2Connection;
-  }
-
   protected void setH2Connection()
       throws Exception {
     assertNull(_h2Connection);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 93eff5c5ac8..29ee3a0affe 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -580,11 +580,4 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     }
     return testCaseMap;
   }
-
-  private static Object extractExtraProps(Map<String, Object> extraProps, 
String propKey) {
-    if (extraProps == null) {
-      return null;
-    }
-    return extraProps.getOrDefault(propKey, null);
-  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 3e75dfb1007..f3fb219e8c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -138,10 +138,6 @@ public class MockInstanceDataManagerFactory {
     return _registeredSchemaMap;
   }
 
-  public Map<String, Schema> buildSchemaMap() {
-    return _schemaMap;
-  }
-
   public Map<String, List<GenericRow>> buildTableRowsMap() {
     return _tableRowsMap;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to