This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0171f99ee1 [multistage] restructure runner test (#9489) 0171f99ee1 is described below commit 0171f99ee1feaeb11bd84e002fc4797991a07d39 Author: Rong Rong <ro...@apache.org> AuthorDate: Thu Sep 29 19:16:44 2022 -0700 [multistage] restructure runner test (#9489) * fix dispatcher/server shutdown * fix lint Co-authored-by: Rong Rong <ro...@startree.ai> --- .../QueryRunnerTest.java => QueryTestSet.java} | 146 +---------------- .../pinot/query/runtime/QueryRunnerTest.java | 176 +-------------------- .../pinot/query/runtime/QueryRunnerTestBase.java | 3 +- .../pinot/query/service/QueryDispatcherTest.java | 18 +-- .../pinot/query/service/QueryServerTest.java | 23 +-- 5 files changed, 19 insertions(+), 347 deletions(-) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java similarity index 66% copy from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java copy to pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java index 872eec4313..31d1670861 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java @@ -16,136 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import org.apache.pinot.core.transport.ServerInstance; -import org.apache.pinot.query.planner.QueryPlan; -import org.apache.pinot.query.planner.stage.MailboxReceiveNode; -import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; -import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.query.service.QueryDispatcher; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - - -public class QueryRunnerTest extends QueryRunnerTestBase { - - @Test(dataProvider = "testDataWithSqlToFinalRowCount") - public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) - throws Exception { - List<Object[]> resultRows = queryRunner(sql); - Assert.assertEquals(resultRows.size(), expectedRows); - } - - @Test(dataProvider = "testDataWithSql") - public void testSqlWithH2Checker(String sql) - throws Exception { - List<Object[]> resultRows = queryRunner(sql); - // query H2 for data - List<Object[]> expectedRows = queryH2(sql); - compareRowEquals(resultRows, expectedRows); - } +package org.apache.pinot.query; - private List<Object[]> queryRunner(String sql) { - QueryPlan queryPlan = _queryEnvironment.planQuery(sql); - Map<String, String> requestMetadataMap = - ImmutableMap.of("REQUEST_ID", String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())); - MailboxReceiveOperator mailboxReceiveOperator = null; - for (int stageId : queryPlan.getStageMetadataMap().keySet()) { - if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) { - MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId); - mailboxReceiveOperator = QueryDispatcher.createReduceStageOperator(_mailboxService, - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), - Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), - reduceNode.getDataSchema(), "localhost", _reducerGrpcPort); - } else { - for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) { - DistributedStagePlan distributedStagePlan = - QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance); - _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); - } - } - } - Preconditions.checkNotNull(mailboxReceiveOperator); - return QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator), - queryPlan.getQueryResultFields()).getRows(); - } +import org.testng.annotations.DataProvider; - private List<Object[]> queryH2(String sql) - throws Exception { - Statement h2statement = _h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - h2statement.execute(sql); - ResultSet h2ResultSet = h2statement.getResultSet(); - int columnCount = h2ResultSet.getMetaData().getColumnCount(); - List<Object[]> result = new ArrayList<>(); - while (h2ResultSet.next()) { - Object[] row = new Object[columnCount]; - for (int i = 0; i < columnCount; i++) { - row[i] = h2ResultSet.getObject(i + 1); - } - result.add(row); - } - return result; - } - private void compareRowEquals(List<Object[]> resultRows, List<Object[]> expectedRows) { - Assert.assertEquals(resultRows.size(), expectedRows.size()); - - Comparator<Object> valueComp = (l, r) -> { - if (l == null && r == null) { - return 0; - } else if (l == null) { - return -1; - } else if (r == null) { - return 1; - } - if (l instanceof Integer) { - return Integer.compare((Integer) l, ((Number) r).intValue()); - } else if (l instanceof Long) { - return Long.compare((Long) l, ((Number) r).longValue()); - } else if (l instanceof Float) { - return Float.compare((Float) l, ((Number) r).floatValue()); - } else if (l instanceof Double) { - return Double.compare((Double) l, ((Number) r).doubleValue()); - } else if (l instanceof String) { - return ((String) l).compareTo((String) r); - } else { - throw new RuntimeException("non supported type"); - } - }; - Comparator<Object[]> rowComp = (l, r) -> { - int cmp = 0; - for (int i = 0; i < l.length; i++) { - cmp = valueComp.compare(l[i], r[i]); - if (cmp != 0) { - return cmp; - } - } - return 0; - }; - resultRows.sort(rowComp); - expectedRows.sort(rowComp); - for (int i = 0; i < resultRows.size(); i++) { - Object[] resultRow = resultRows.get(i); - Object[] expectedRow = expectedRows.get(i); - for (int j = 0; j < resultRow.length; j++) { - Assert.assertEquals(valueComp.compare(resultRow[j], expectedRow[j]), 0, - "Not match at (" + i + "," + j + ")! Expected: " + expectedRow[j] + " Actual: " + resultRow[j]); - } - } - } +public class QueryTestSet { - @DataProvider(name = "testDataWithSql") - private Object[][] provideTestSql() { + @DataProvider(name = "testSql") + public Object[][] provideTestSql() { return new Object[][]{ // Order BY LIMIT new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"}, @@ -317,19 +196,4 @@ public class QueryRunnerTest extends QueryRunnerTestBase { new Object[]{"SELECT col1, COUNT(col3) FROM a GROUP BY col1 HAVING SUM(col3) > 40 AND SUM(col3) < 30"}, }; } - - @DataProvider(name = "testDataWithSqlToFinalRowCount") - private Object[][] provideTestSqlAndRowCount() { - return new Object[][] { - // using join clause - new Object[]{"SELECT * FROM a JOIN b USING (col1)", 15}, - - // test dateTrunc - // - on leaf stage - new Object[]{"SELECT dateTrunc('DAY', ts) FROM a LIMIT 10", 15}, - // - on intermediate stage - new Object[]{"SELECT dateTrunc('DAY', round(a.ts, b.ts)) FROM a JOIN b " - + "ON a.col1 = b.col1 AND a.col2 = b.col2", 15}, - }; - } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 872eec4313..e1f685a363 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -46,7 +46,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase { Assert.assertEquals(resultRows.size(), expectedRows); } - @Test(dataProvider = "testDataWithSql") + @Test(dataProvider = "testSql") public void testSqlWithH2Checker(String sql) throws Exception { List<Object[]> resultRows = queryRunner(sql); @@ -144,180 +144,6 @@ public class QueryRunnerTest extends QueryRunnerTestBase { } } - @DataProvider(name = "testDataWithSql") - private Object[][] provideTestSql() { - return new Object[][]{ - // Order BY LIMIT - new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3"}, - new Object[]{"SELECT * FROM a ORDER BY col1, ts LIMIT 10"}, - new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20"}, - - // No match filter - new Object[]{"SELECT * FROM b WHERE col3 < 0.5"}, - - // Hybrid table - new Object[]{"SELECT * FROM d"}, - - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // thus the final JOIN result will be 15 x 1 = 15. - // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row - // of the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between - // (1, 42, 1, 42, 1). we will have 9 1s, and 6 42s, total result count will be 9 * 9 + 6 * 6 = 117 - new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col3 = c.col3"}, - // Reverse the order of join condition and join table order. - new Object[]{"SELECT * FROM a JOIN b ON b.col1 = a.col1 JOIN c ON c.col3 = a.col3"}, - - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // thus the final JOIN result will be 15 x 1 = 15. - new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1"}, - - // Query with function in JOIN keys, table A and B are both (1, 42, 1, 42, 1), with table A cycling 3 times. - // Because: - // - MOD(a.col3, 2) will have 6 (42)s equal to 0 and 9 (1)s equals to 1 - // - MOD(b.col3, 3) will have 2 (42)s equal to 0 and 3 (1)s equals to 1; - // final results are 6 * 2 + 9 * 3 = 39 rows - new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON MOD(a.col3, 2) = MOD(b.col3, 3)"}, - - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // thus the final JOIN result will be 15 x 1 = 15. - new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2"}, - // Reverse the order of join condition and join table order. - new Object[]{"SELECT * FROM a JOIN b on b.col1 = a.col1 AND b.col2 = a.col2"}, - - // LEFT JOIN - new Object[]{"SELECT * FROM a LEFT JOIN b on a.col1 = b.col2"}, - - new Object[]{"SELECT a.col1, SUM(CASE WHEN b.col3 IS NULL THEN 0 ELSE b.col3 END) " - + " FROM a LEFT JOIN b on a.col1 = b.col2 GROUP BY a.col1"}, - - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // but only 1 out of 5 rows from table A will be selected out; and all in table B will be selected. - // thus the final JOIN result will be 1 x 3 x 1 = 3. - new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 " - + " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0"}, - - // Join query with IN and Not-IN clause. Table A's side of join will return 9 rows and Table B's side will - // return 2 rows. Join will be only on col1=bar and since A will return 3 rows with that value and B will return - // 1 row, the final output will have 3 rows. - new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 " - + " WHERE a.col1 IN ('foo', 'bar', 'alice') AND b.col2 NOT IN ('foo', 'alice')"}, - - // Same query as above but written using OR/AND instead of IN. - new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 " - + " WHERE (a.col1 = 'foo' OR a.col1 = 'bar' OR a.col1 = 'alice') AND b.col2 != 'foo'" - + " AND b.col2 != 'alice'"}, - - // Same as above but with single argument IN clauses. Left side of the join returns 3 rows, and the right side - // returns 5 rows. Only key where join succeeds is col1=foo, and since table B has only 1 row with that value, - // the number of rows should be 3. - new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 " - + " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')"}, - - // Range conditions with continuous and non-continuous range. - new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 " - + " WHERE a.col3 IN (1, 2, 3) OR (a.col3 > 10 AND a.col3 < 50)"}, - - new Object[]{"SELECT col1, SUM(col3) FROM a WHERE a.col3 BETWEEN 23 AND 36 " - + " GROUP BY col1 HAVING SUM(col3) > 10.0 AND MIN(col3) <> 123 AND MAX(col3) BETWEEN 10 AND 20"}, - - new Object[]{"SELECT col1, SUM(col3) FROM a WHERE (col3 > 0 AND col3 < 45) AND (col3 > 15 AND col3 < 50) " - + " GROUP BY col1 HAVING (SUM(col3) > 10 AND SUM(col3) < 20) AND (SUM(col3) > 30 AND SUM(col3) < 40)"}, - - // Projection pushdown - new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"}, - - // Inequality JOIN & partial filter pushdown - new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"}, - - new Object[]{"SELECT * FROM a, b WHERE a.col1 > b.col2 AND a.col3 > b.col3"}, - - // Aggregation with group by - new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1"}, - - // Aggregation with multiple group key - new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1, a.col2"}, - - // Aggregation without GROUP BY - new Object[]{"SELECT SUM(col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"}, - - // Aggregation with GROUP BY on a count star reference - new Object[]{"SELECT a.col1, COUNT(*) FROM a WHERE a.col3 >= 0 GROUP BY a.col1"}, - - // project in intermediate stage - // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1), - // col1 on both are "foo", "bar", "alice", "bob", "charlie" - // col2 on both are "foo", "bar", "alice", "foo", "bar", - // filtered at : ^ ^ - // thus the final JOIN result will have 6 rows: 3 "foo" <-> "foo"; and 3 "bob" <-> "bob" - new Object[]{"SELECT a.col1, a.col2, a.ts, b.col1, b.col3 FROM a JOIN b ON a.col1 = b.col2 " - + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0"}, - - // Making transform after JOIN, number of rows should be the same as JOIN result. - new Object[]{"SELECT a.col1, a.ts, a.col3 - b.col3 FROM a JOIN b ON a.col1 = b.col2 " - + " WHERE a.col3 >= 0 AND b.col3 >= 0"}, - - // Making transform after GROUP-BY, number of rows should be the same as GROUP-BY result. - new Object[]{"SELECT a.col1, a.col2, SUM(a.col3) - MIN(a.col3) FROM a" - + " WHERE a.col3 >= 0 GROUP BY a.col1, a.col2"}, - - // GROUP BY after JOIN - // - optimizable transport for GROUP BY key after JOIN, using SINGLETON exchange - // only 3 GROUP BY key exist because b.col2 cycles between "foo", "bar", "alice". - new Object[]{"SELECT a.col1, SUM(b.col3), COUNT(*), SUM(2) FROM a JOIN b ON a.col1 = b.col2 " - + " WHERE a.col3 >= 0 GROUP BY a.col1"}, - // - non-optimizable transport for GROUP BY key after JOIN, using HASH exchange - // only 2 GROUP BY key exist for b.col3. - new Object[]{"SELECT b.col3, SUM(a.col3) FROM a JOIN b" - + " on a.col1 = b.col1 AND a.col2 = b.col2 GROUP BY b.col3"}, - - // Sub-query - new Object[]{"SELECT b.col1, b.col3, i.maxVal FROM b JOIN " - + " (SELECT a.col2 AS joinKey, MAX(a.col3) AS maxVal FROM a GROUP BY a.col2) AS i " - + " ON b.col1 = i.joinKey"}, - - // Sub-query with IN clause to SEMI JOIN. - new Object[]{"SELECT b.col1, b.col2, SUM(b.col3) * 100 / COUNT(b.col3) FROM b WHERE b.col1 IN " - + " (SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1, b.col2"}, - new Object[]{"SELECT SUM(b.col3) FROM b WHERE b.col3 > (SELECT AVG(a.col3) FROM a WHERE a.col2 != 'bar')"}, - - // Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2 times each and "alice" occurred 3/1 times - // numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo, bar) - // - COUNT(*) < 5 matches "alice" (3 times) - // - COUNT(*) > 5 matches "foo" and "bar" (6 times); so both will be selected out SUM(a.col3) = (1 + 42) * 3 - // - last condition doesn't match anything. - // total to 3 rows. - new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 " - + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)" - + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)"}, - new Object[]{"SELECT COUNT(*) AS Count, MAX(a.col3) AS \"max\" FROM a GROUP BY a.col2 " - + "HAVING Count > 1 AND \"max\" < 50"}, - - // Order-by - new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 = b.col1 ORDER BY a.col3, b.col3 DESC"}, - new Object[]{"SELECT MAX(a.col3) FROM a GROUP BY a.col2 ORDER BY MAX(a.col3) - MIN(a.col3)"}, - - // Test CAST - // - implicit CAST - new Object[]{"SELECT a.col1, a.col2, AVG(a.col3) FROM a GROUP BY a.col1, a.col2"}, - new Object[]{"SELECT a.col1 FROM a WHERE a.col3 >= 0.5 AND a.col3 < 0.7 OR a.col3 = 42.0"}, - new Object[]{"SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1 " - + " HAVING MIN(a.col3) > 0.5 AND MIN(a.col3) <> 0.7 OR MIN(a.col3) > 30"}, - // - explicit CAST - new Object[]{"SELECT a.col1, CAST(SUM(a.col3) AS BIGINT) FROM a GROUP BY a.col1"}, - - // Test DISTINCT - // - distinct value done via GROUP BY with empty expr aggregation list. - new Object[]{"SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 " - + " WHERE b.col3 > 0 GROUP BY a.col2, a.col3"}, - - // Test optimized constant literal. - new Object[]{"SELECT col1 FROM a WHERE col3 > 0 AND col3 < -5"}, - new Object[]{"SELECT COALESCE(SUM(col3), 0) FROM a WHERE col1 = 'foo' AND col1 = 'bar'"}, - new Object[]{"SELECT SUM(CAST(col3 AS INTEGER)) FROM a HAVING MIN(col3) BETWEEN 1 AND 0"}, - new Object[]{"SELECT col1, COUNT(col3) FROM a GROUP BY col1 HAVING SUM(col3) > 40 AND SUM(col3) < 30"}, - }; - } - @DataProvider(name = "testDataWithSqlToFinalRowCount") private Object[][] provideTestSqlAndRowCount() { return new Object[][] { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index 702e7b466d..9ad1723362 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -36,6 +36,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestUtils; import org.apache.pinot.query.QueryServerEnclosure; +import org.apache.pinot.query.QueryTestSet; import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.service.QueryConfig; @@ -50,7 +51,7 @@ import org.testng.annotations.BeforeClass; -public class QueryRunnerTestBase { +public class QueryRunnerTestBase extends QueryTestSet { private static final File INDEX_DIR_S1_A = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA"); private static final File INDEX_DIR_S1_B = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB"); private static final File INDEX_DIR_S1_C = new File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC"); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java index 9057d1a7ec..34e52f56bb 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Random; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestUtils; +import org.apache.pinot.query.QueryTestSet; import org.apache.pinot.query.planner.PlannerUtils; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.runtime.QueryRunner; @@ -32,11 +33,10 @@ import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class QueryDispatcherTest { +public class QueryDispatcherTest extends QueryTestSet { private static final Random RANDOM_REQUEST_ID_GEN = new Random(); private static final int QUERY_SERVER_COUNT = 2; private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); @@ -70,23 +70,13 @@ public class QueryDispatcherTest { } } - @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest") + @Test(dataProvider = "testSql") public void testQueryDispatcherCanSendCorrectPayload(String sql) throws Exception { QueryPlan queryPlan = _queryEnvironment.planQuery(sql); QueryDispatcher dispatcher = new QueryDispatcher(); int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan); Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId)); - } - - @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest") - private Object[][] provideTestSqlToCompiledToWorkerRequest() { - return new Object[][] { - new Object[]{"SELECT * FROM b"}, - new Object[]{"SELECT * FROM a"}, - new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"}, - new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 " - + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"}, - }; + dispatcher.shutdown(); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index 245f78e651..7202ab8d91 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.service; import com.google.common.collect.Lists; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.Comparator; import java.util.HashMap; @@ -31,6 +32,7 @@ import org.apache.pinot.common.proto.Worker; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestUtils; +import org.apache.pinot.query.QueryTestSet; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.StageNode; @@ -42,13 +44,12 @@ import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; -public class QueryServerTest { +public class QueryServerTest extends QueryTestSet { private static final Random RANDOM_REQUEST_ID_GEN = new Random(); private static final int QUERY_SERVER_COUNT = 2; private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); @@ -88,7 +89,7 @@ public class QueryServerTest { } @SuppressWarnings("unchecked") - @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest") + @Test(dataProvider = "testSql") public void testWorkerAcceptsWorkerRequestCorrect(String sql) throws Exception { QueryPlan queryPlan = _queryEnvironment.planQuery(sql); @@ -126,17 +127,6 @@ public class QueryServerTest { } } - @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest") - private Object[][] provideTestSqlToCompiledToWorkerRequest() { - return new Object[][] { - new Object[]{"SELECT * FROM b"}, - new Object[]{"SELECT * FROM a"}, - new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"}, - new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 " - + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"}, - }; - } - private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata right) { return left.getServerInstances().equals(right.getServerInstances()) && left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap()) @@ -163,11 +153,12 @@ public class QueryServerTest { private void submitRequest(Worker.QueryRequest queryRequest) { String host = queryRequest.getMetadataMap().get("SERVER_INSTANCE_HOST"); int port = Integer.parseInt(queryRequest.getMetadataMap().get("SERVER_INSTANCE_PORT")); - PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = - PinotQueryWorkerGrpc.newBlockingStub(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()); + ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = PinotQueryWorkerGrpc.newBlockingStub(channel); Worker.QueryResponse resp = stub.submit(queryRequest); // TODO: validate meaningful return value Assert.assertNotNull(resp.getMetadataMap().get("OK")); + channel.shutdown(); } private Worker.QueryRequest getQueryRequest(QueryPlan queryPlan, int stageId) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org