yashmayya commented on code in PR #15180: URL: https://github.com/apache/pinot/pull/15180#discussion_r1998067113
########## pinot-query-planner/src/main/java/org/apache/pinot/query/function/InternalMseFunctions.java: ########## @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.function; + +import org.apache.pinot.query.MseWorkerThreadContext; +import org.apache.pinot.spi.annotations.ScalarFunction; + + +public class InternalMseFunctions { Review Comment: IMO we can move all these functions to the common `InternalFunctions` class in the `org.apache.pinot.common.function.scalar` package - it seems slightly odd to have them here. The Javadocs you've added already provide enough context and differentiation. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java: ########## @@ -70,7 +71,13 @@ public void submit(Worker.QueryRequest request, QueryServerInstance virtualServe } public void cancel(long requestId) { - Worker.CancelRequest cancelRequest = Worker.CancelRequest.newBuilder().setRequestId(requestId).build(); + String cid = QueryThreadContext.isInitialized() && QueryThreadContext.getCid() != null + ? QueryThreadContext.getCid() + : Long.toString(requestId); + Worker.CancelRequest cancelRequest = Worker.CancelRequest.newBuilder() + .setRequestId(requestId) + .setCid(cid) + .build(); Review Comment: > The later is needed to be able to set the cid on the QueryThreadContexts of the servers that are going to cancel the query. They don't need the cid to cancel the query, but we want to include that in the cancellation logs for sure. Thanks, this was what I missed earlier. ########## pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java: ########## @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.query; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.executor.DecoratorExecutorService; +import org.apache.pinot.spi.trace.LoggerConstants; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * The {@code QueryThreadContext} class is a thread-local context for storing common query-related information + * associated to the current thread. + * + * <p>It is used to pass information between different layers of the query execution stack without changing the + * method signatures. This is also used to populate the {@link MDC} context for logging. + * + * Use {@link #open()} to initialize the empty context. As any other {@link AutoCloseable} object, it should be used + * within a try-with-resources block to ensure the context is properly closed and removed from the thread-local storage. + * + * Sometimes it is necessary to copy the state of the {@link QueryThreadContext} from one thread to another. In this + * case, use the {@link #createMemento()} method to capture the state of the {@link QueryThreadContext} in the current + * thread and then use the {@link #open(Memento)} method to initialize the context in the target thread with the state + * captured in the {@link Memento} object. The API may be a bit cumbersome, but it ensures that the state is always + * copied between threads in a safe way and makes it impossible to use the {@link QueryThreadContext} from another + * thread by mistake. + * + * It is guaranteed that all server and broker threads running SSE and MSE will have this context initialized as soon + * as the request is received. Ingestion threads that use the query execution stack will also have this context + * initialized. + */ +public class QueryThreadContext { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryThreadContext.class); + private static final ThreadLocal<Instance> THREAD_LOCAL = new ThreadLocal<>(); + public static volatile boolean _strictMode = false; + private static final FakeInstance FAKE_INSTANCE = new FakeInstance(); + + static { + // This is a hack to know if assertions are enabled or not + boolean assertEnabled = false; + //CHECKSTYLE:OFF + assert assertEnabled = true; + //CHECKSTYLE:ON + _strictMode = assertEnabled; + } + + /** + * Private constructor to prevent instantiation. + * + * Use {@link #open()} to initialize the context instead. + */ + private QueryThreadContext() { + } + + /** + * Sets the strict mode of the {@link QueryThreadContext} from the given configuration. + */ + public static void onStartup(PinotConfiguration conf) { + String mode = conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE); + if ("strict".equalsIgnoreCase(mode)) { + _strictMode = true; + } + if (mode != null && !mode.isEmpty()) { + throw new IllegalArgumentException("Invalid value '" + mode + "' for " + + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 'strict' or empty"); + } + } + + /** + * Returns {@code true} if the {@link QueryThreadContext} is in strict mode. + * + * In strict mode, if the {@link QueryThreadContext} is not initialized, an {@link IllegalStateException} will be + * thrown when setter and getter methods are used. + * In non-strict mode, a warning will be logged and the fake instance will be returned. + * + * @see #onStartup(PinotConfiguration) + */ + public static boolean isStrictMode() { + return _strictMode; + } + + private static Instance get() { + Instance instance = THREAD_LOCAL.get(); + if (instance == null) { + String errorMessage = "QueryThreadContext is not initialized"; + if (_strictMode) { + LOGGER.error(errorMessage); + throw new IllegalStateException("QueryThreadContext is not initialized"); + } else { + LOGGER.debug(errorMessage); + // in non-strict mode, return the fake instance + return FAKE_INSTANCE; + } + } + return instance; + } + + /** + * Returns {@code true} if the {@link QueryThreadContext} is initialized in the current thread. + * + * Initializing the context means that the {@link #open()} method was called and the returned object is not closed + * yet. + */ + public static boolean isInitialized() { + return THREAD_LOCAL.get() != null; + } + + /** + * Captures the state of the {@link QueryThreadContext} in the current thread. + * + * This method is used to capture the state of the {@link QueryThreadContext} in the current thread so that it can be + * restored later in another thread. + * + * @return a {@link Memento} object that captures the state of the {@link QueryThreadContext} + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + * @see #open(Memento) + */ + public static Memento createMemento() { + return new Memento(get()); + } + + /** + * Initializes the {@link QueryThreadContext} with default values. + * + * This method will throw an {@link IllegalStateException} if the {@link QueryThreadContext} is already initialized. + * That indicates an error in the code. Older context must be closed before opening a new one. + * + * @return an {@link AutoCloseable} object that should be used within a try-with-resources block + * @throws IllegalStateException if the {@link QueryThreadContext} is already initialized. + */ + public static CloseableContext open() { + return open(null); + } + + public static CloseableContext openFromRequestMetadata(Map<String, String> requestMetadata) { + CloseableContext open = open(); + String cid = requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID); + long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID)); + if (cid == null) { + cid = Long.toString(requestId); + } + QueryThreadContext.setIds(requestId, cid); + long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); + long startTimeMs = System.currentTimeMillis(); + QueryThreadContext.setStartTimeMs(startTimeMs); + QueryThreadContext.setDeadlineMs(startTimeMs + timeoutMs); + + return open; + } + + /** + * Initializes the {@link QueryThreadContext} with the state captured in the given {@link Memento} object, if any. + * + * This method will throw an {@link IllegalStateException} if the {@link QueryThreadContext} is already initialized. + * That indicates an error in the code. Older context must be closed before opening a new one. + * + * Values that were set in the {@link Memento} object will be set in the {@link QueryThreadContext} and therefore + * they couldn't be set again in the current thread (at least until the returned {@link AutoCloseable} is closed). + * + * @param memento the {@link Memento} object to capture the state from + * (if {@code null}, the context will be initialized with default values) + * @return an {@link AutoCloseable} object that should be used within a try-with-resources block + * @throws IllegalStateException if the {@link QueryThreadContext} is already initialized. + */ + public static CloseableContext open(@Nullable Memento memento) { + if (THREAD_LOCAL.get() != null) { + String errorMessage = "QueryThreadContext is already initialized"; + if (_strictMode) { + LOGGER.error(errorMessage); + throw new IllegalStateException("QueryThreadContext is already initialized"); + } else { + LOGGER.debug(errorMessage); + return FAKE_INSTANCE; + } + } + + Instance context = new Instance(); + if (memento != null) { + context.setStartTimeMs(memento._startTimeMs); + context.setDeadlineMs(memento._deadlineMs); + context.setBrokerId(memento._brokerId); + context.setRequestId(memento._requestId); + context.setCid(memento._cid); + context.setSql(memento._sql); + context.setQueryEngine(memento._queryEngine); + } + + THREAD_LOCAL.set(context); + + return context; + } + + /** + * Returns a new {@link ExecutorService} whose tasks will be executed with the {@link QueryThreadContext} initialized + * with the state of the thread submitting the tasks. + * + * @param executorService the {@link ExecutorService} to decorate + */ + public static ExecutorService contextAwareExecutorService(ExecutorService executorService) { + return contextAwareExecutorService(executorService, QueryThreadContext::createMemento); + } + + /** + * Returns a new {@link ExecutorService} whose tasks will be executed with the {@link QueryThreadContext} initialized + * with the state captured in the given {@link Memento} object. + * + * @param executorService the {@link ExecutorService} to decorate + * @param mementoSupplier a supplier that provides the {@link Memento} object to capture the state from + */ + public static ExecutorService contextAwareExecutorService( + ExecutorService executorService, + Supplier<Memento> mementoSupplier) { + return new DecoratorExecutorService(executorService) { + @Override + protected <T> Callable<T> decorate(Callable<T> task) { + Memento memento = mementoSupplier.get(); + return () -> { + try (CloseableContext ignored = open(memento)) { + return task.call(); + } + }; + } + + @Override + protected Runnable decorate(Runnable task) { + Memento memento = mementoSupplier.get(); + return () -> { + try (CloseableContext ignored = open(memento)) { + task.run(); + } + }; + } + }; + } + + /** + * Returns the start time of the query in milliseconds. + * + * The default value of 0 means the start time is not set. + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + */ + public static long getStartTimeMs() { + return get().getStartTimeMs(); + } + + /** + * Sets the start time of the query in milliseconds since epoch. + * + * The start time can only be set once. + * @throws IllegalStateException if start time is already set or if the {@link QueryThreadContext} is not initialized + */ + public static void setStartTimeMs(long startTimeMs) { + get().setStartTimeMs(startTimeMs); + } + + /** + * Returns the deadline time of the query in milliseconds since epoch. + * + * The default value of 0 means the deadline is not set. + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + */ + public static long getDeadlineMs() { + return get().getDeadlineMs(); + } + + /** + * Sets the deadline time of the query in milliseconds since epoch. + * + * The deadline can only be set once. + * @throws IllegalStateException if deadline is already set or if the {@link QueryThreadContext} is not initialized + */ + public static void setDeadlineMs(long deadlineMs) { + get().setDeadlineMs(deadlineMs); + } + + /** + * Returns the timeout of the query in milliseconds. + * + * The default value of 0 means the timeout is not set. + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + */ + public static String getBrokerId() { + return get().getBrokerId(); + } + + /** + * Sets the broker id of the query. + * + * The broker id can only be set once. + * @throws IllegalStateException if broker id is already set or if the {@link QueryThreadContext} is not initialized + */ + public static void setBrokerId(String brokerId) { + get().setBrokerId(brokerId); + } + + /** + * Returns the request id of the query. + * + * The request id is used to identify query phases across different systems. + * Contrary to the correlation id, a single logical query can have multiple request ids. + * This is because the request id is changed at different stages of the query (e.g. real-time and offline parts of + * the query or different leaf operations in MSE). + * + * Also remember that neither request nor correlation id are guaranteed to be unique. + * + * The default value of 0 means the request id is not set. + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + */ + public static long getRequestId() { + return get().getRequestId(); + } + + /** + * Returns the correlation id of the query. + * + * Correlation id is used to track queries across different systems. + * This id can be supplied by the client or generated by the system (usually the broker). It is not guaranteed to be + * unique. Customers can use the same correlation id for multiple queries to track a single logical workflow on their Review Comment: ```suggestion * unique. Users can use the same correlation id for multiple queries to track a single logical workflow on their ``` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryThreadContextIntegrationTest.java: ########## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class QueryThreadContextIntegrationTest extends BaseClusterIntegrationTest + implements ExplainIntegrationTestTrait { + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN; + brokerConf.setProperty(property, "true"); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testReqIdOnServer(boolean useMse) + throws Exception { + setUseMultiStageQueryEngine(useMse); + JsonNode jsonNode = postQuery("SELECT reqId(Dest), count(*) " + + "FROM mytable " + + "GROUP BY 1"); + DocumentContext parsed = JsonPath.parse(jsonNode.toString()); + Assert.assertEquals(parsed.read("$.numRowsResultSet", Integer.class), 1, "Unexpected number of rows"); + Assert.assertEquals(parsed.read("$.resultTable.rows[0][1]", Integer.class), 115545, "Unexpected count"); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testReqIdOnSimplification(boolean useMse) Review Comment: nit: why not `testReqIdOnBroker` instead similar to the above for server? On both the query engines, this is guaranteed to be executed in the broker (`CompileTimeFunctionsInvoker` for SSE and `PinotEvaluateLiteralRule` for MSE). ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryThreadContextIntegrationTest.java: ########## @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import java.io.File; +import java.util.List; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class QueryThreadContextIntegrationTest extends BaseClusterIntegrationTest + implements ExplainIntegrationTestTrait { + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(2); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + String property = CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN; + brokerConf.setProperty(property, "true"); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testReqIdOnServer(boolean useMse) + throws Exception { + setUseMultiStageQueryEngine(useMse); + JsonNode jsonNode = postQuery("SELECT reqId(Dest), count(*) " + + "FROM mytable " + + "GROUP BY 1"); + DocumentContext parsed = JsonPath.parse(jsonNode.toString()); + Assert.assertEquals(parsed.read("$.numRowsResultSet", Integer.class), 1, "Unexpected number of rows"); + Assert.assertEquals(parsed.read("$.resultTable.rows[0][1]", Integer.class), 115545, "Unexpected count"); + } + + @Test(dataProvider = "useBothQueryEngines") + public void testReqIdOnSimplification(boolean useMse) + throws Exception { + setUseMultiStageQueryEngine(useMse); + JsonNode jsonNode = postQuery("SELECT reqId('cte'), count(*) " + + "FROM mytable " + + "GROUP BY 1"); + DocumentContext parsed = JsonPath.parse(jsonNode.toString()); + Assert.assertEquals(parsed.read("$.numRowsResultSet", Integer.class), 1, "Unexpected number of rows"); + Assert.assertEquals(parsed.read("$.resultTable.rows[0][1]", Integer.class), 115545, "Unexpected count"); + } Review Comment: Why no assertions on the request ID returned here? We could check that it's a non-empty value and maybe a valid long as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org