This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f54e3e73ff [multistage][test] add multi-server, multi-segment test (#9943) f54e3e73ff is described below commit f54e3e73ffca1f919488e1b6413334b535acb194 Author: Rong Rong <ro...@apache.org> AuthorDate: Wed Dec 14 09:01:17 2022 -0800 [multistage][test] add multi-server, multi-segment test (#9943) * randomized test * [stash] use line breaker and partitionColumn * adding line breaker and example test Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/query/runtime/QueryRunnerTestBase.java | 26 +++++++++---- .../runtime/queries/ResourceBasedQueriesTest.java | 45 ++++++++++++++++++---- .../src/test/resources/queries/BasicQuery.json | 8 +++- 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index a0a5cedf9d..65fe73e29d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -62,7 +62,9 @@ import org.testng.Assert; public abstract class QueryRunnerTestBase extends QueryTestSet { protected static final double DOUBLE_CMP_EPSILON = 0.0001d; - + protected static final String SEGMENT_BREAKER_KEY = "__SEGMENT_BREAKER_KEY__"; + protected static final String SEGMENT_BREAKER_STR = "------"; + protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow(); protected static final Random RANDOM_REQUEST_ID_GEN = new Random(); protected QueryEnvironment _queryEnvironment; protected String _reducerHostname; @@ -70,6 +72,10 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { protected Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>(); protected GrpcMailboxService _mailboxService; + static { + SEGMENT_BREAKER_ROW.putValue(SEGMENT_BREAKER_KEY, SEGMENT_BREAKER_STR); + } + // -------------------------------------------------------------------------- // QUERY UTILS // -------------------------------------------------------------------------- @@ -210,13 +216,17 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { for (int rowId = 0; rowId < value.size(); rowId++) { GenericRow row = new GenericRow(); List<Object> rawRow = value.get(rowId); - int colId = 0; - for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) { - row.putValue(columnAndType._name, rawRow.get(colId++)); + if (rawRow.size() == 1 && SEGMENT_BREAKER_STR.equals(rawRow.get(0))) { + result.add(SEGMENT_BREAKER_ROW); + } else { + int colId = 0; + for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) { + row.putValue(columnAndType._name, rawRow.get(colId++)); + } + // TODO: ts is built-in, but we should allow user overwrite + row.putValue("ts", System.currentTimeMillis()); + result.add(row); } - // TODO: ts is built-in, but we should allow user overwrite - row.putValue("ts", System.currentTimeMillis()); - result.add(row); } return result; } @@ -327,6 +337,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { public List<ColumnAndType> _schema; @JsonProperty("inputs") public List<List<Object>> _inputs; + @JsonProperty("partitionColumns") + public List<String> _partitionColumns; } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java index 5910416ac4..78553b5e1b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -47,6 +48,7 @@ import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; @@ -60,20 +62,20 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Pattern TABLE_NAME_REPLACE_PATTERN = Pattern.compile("\\{([\\w\\d]+)\\}"); private static final String QUERY_TEST_RESOURCE_FOLDER = "queries"; + private static final Random RANDOM = new Random(42); @BeforeClass public void setUp() throws Exception { DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); + // Setting up mock server factories. + // All test data are loaded upfront b/c the mock server and brokers needs to be in sync. MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1"); MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2"); // Setting up H2 for validation setH2Connection(); - // TODO: all test data are loaded upfront b/c the mock server and brokers needs to be in sync. - // doing it dynamically should be our next step. - // Scan through all the test cases. for (Map.Entry<String, QueryTestCase> testCaseEntry : getTestCases().entrySet()) { String testCaseName = testCaseEntry.getKey(); @@ -86,16 +88,45 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { Map<String, Schema> schemaMap = new HashMap<>(); for (Map.Entry<String, QueryTestCase.Table> tableEntry : testCase._tables.entrySet()) { String tableName = testCaseName + "_" + tableEntry.getKey(); - // TODO: able to choose table type, now default to OFFLINE + // Testing only OFFLINE table b/c Hybrid table test is a special case to test separately. String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); org.apache.pinot.spi.data.Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema); schemaMap.put(tableName, pinotSchema); factory1.registerTable(pinotSchema, tableNameWithType); factory2.registerTable(pinotSchema, tableNameWithType); List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema; - // TODO: able to select add rows to server1 or server2 (now default server1) - // TODO: able to select add rows to existing segment or create new one (now default create one segment) - factory1.addSegment(tableNameWithType, toRow(columnAndTypes, tableEntry.getValue()._inputs)); + List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs); + + // generate segments and dump into server1 and server2 + List<String> partitionColumns = tableEntry.getValue()._partitionColumns; + + List<GenericRow> rows1 = new ArrayList<>(); + List<GenericRow> rows2 = new ArrayList<>(); + + for (GenericRow row : genericRows) { + if (row == SEGMENT_BREAKER_ROW) { + factory1.addSegment(tableNameWithType, rows1); + factory2.addSegment(tableNameWithType, rows2); + rows1 = new ArrayList<>(); + rows2 = new ArrayList<>(); + } else { + long partition = 0; + if (partitionColumns == null) { + partition = RANDOM.nextInt(2); + } else { + for (String field : partitionColumns) { + partition = (partition + ((GenericRow) row).getValue(field).hashCode()) % 42; + } + } + if (partition % 2 == 0) { + rows1.add(row); + } else { + rows2.add(row); + } + } + } + factory1.addSegment(tableNameWithType, rows1); + factory2.addSegment(tableNameWithType, rows2); } boolean anyHaveOutput = testCase._queries.stream().anyMatch(q -> q._outputs != null && !q._outputs.isEmpty()); diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json index b664982c40..41fb23aa62 100644 --- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json +++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json @@ -39,7 +39,13 @@ ], "inputs": [ ["foo", 1, 3.1416], - ["bar", 2, 2.7183] + ["foo", 3, 3.1416], + ["bar", 2, 2.7183], + ["------"], + ["bar", 4, 2.7183] + ], + "partitionColumns": [ + "col1", "col2" ] } }, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org