This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7ab5a13637 Add integration test for lookup join in the multi-stage engine (#15244) 7ab5a13637 is described below commit 7ab5a1363769259fd12d9ca8c311151121f0a679 Author: Krishan Goyal <kris...@startree.ai> AuthorDate: Tue Mar 18 22:31:33 2025 +0530 Add integration test for lookup join in the multi-stage engine (#15244) --- .../tests/BaseClusterIntegrationTest.java | 24 +++++++++++ .../tests/ClusterIntegrationTestUtils.java | 10 ++++- .../tests/MultiStageEngineIntegrationTest.java | 46 ++++++++++++++++++++++ .../src/test/resources/dimDayOfWeek_config.json | 18 +++++++++ .../src/test/resources/dimDayOfWeek_data.csv | 8 ++++ .../src/test/resources/dimDayOfWeek_schema.json | 16 ++++++++ 6 files changed, 121 insertions(+), 1 deletion(-) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 7b59e397d9..dac9873c62 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.sql.Connection; import java.sql.DriverManager; import java.time.Duration; @@ -58,6 +59,7 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.spi.utils.JsonUtils; @@ -279,6 +281,13 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { return Schema.fromInputStream(new FileInputStream(schemaFile)); } + protected TableConfig createTableConfig(String tableConfigFileName) + throws IOException { + URL configPathUrl = getClass().getClassLoader().getResource(tableConfigFileName); + Assert.assertNotNull(configPathUrl); + return createTableConfig(new File(configPathUrl.getFile())); + } + protected TableConfig createTableConfig(File tableConfigFile) throws IOException { InputStream inputStream = new FileInputStream(tableConfigFile); @@ -600,6 +609,21 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { return false; } + protected void createAndUploadSegmentFromFile(TableConfig tableConfig, Schema schema, String dataFilePath, + FileFormat fileFormat, long expectedNoOfDocs, long timeoutMs) throws Exception { + URL dataPathUrl = getClass().getClassLoader().getResource(dataFilePath); + assert dataPathUrl != null; + File file = new File(dataPathUrl.getFile()); + + TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir); + ClusterIntegrationTestUtils.buildSegmentFromFile(file, tableConfig, schema, "%", _segmentDir, _tarDir, fileFormat); + uploadSegments(tableConfig.getTableName(), _tarDir); + + TestUtils.waitForCondition(() -> getCurrentCountStarResult(tableConfig.getTableName()) == expectedNoOfDocs, 100L, + timeoutMs, "Failed to load " + expectedNoOfDocs + " documents in table " + tableConfig.getTableName(), + true, Duration.ofMillis(timeoutMs / 10)); + } + protected List<File> getAllAvroFiles() throws Exception { // Unpack the Avro files diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 2a73c93994..e53c2ce300 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -75,6 +75,7 @@ import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; import org.apache.pinot.spi.utils.JsonUtils; @@ -341,8 +342,15 @@ public class ClusterIntegrationTestUtils { public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig, org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File segmentDir, File tarDir) throws Exception { + buildSegmentFromFile(avroFile, tableConfig, schema, segmentNamePostfix, segmentDir, tarDir, FileFormat.AVRO); + } + + public static void buildSegmentFromFile(File file, TableConfig tableConfig, org.apache.pinot.spi.data.Schema schema, + String segmentNamePostfix, File segmentDir, File tarDir, FileFormat fileFormat) + throws Exception { SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); - segmentGeneratorConfig.setInputFilePath(avroFile.getPath()); + segmentGeneratorConfig.setFormat(fileFormat); + segmentGeneratorConfig.setInputFilePath(file.getPath()); segmentGeneratorConfig.setOutDir(segmentDir.getPath()); segmentGeneratorConfig.setTableName(tableConfig.getTableName()); segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index c295efe49a..7b75a681d0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -29,9 +29,11 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -44,11 +46,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; @@ -75,6 +79,11 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + DEFAULT_TABLE_NAME; private String _tableName = DEFAULT_TABLE_NAME; + private static final String DIM_TABLE_DATA_PATH = "dimDayOfWeek_data.csv"; + private static final String DIM_TABLE_SCHEMA_PATH = "dimDayOfWeek_schema.json"; + private static final String DIM_TABLE_TABLE_CONFIG_PATH = "dimDayOfWeek_config.json"; + private static final Integer DIM_NUMBER_OF_RECORDS = 7; + @Override protected String getSchemaFileName() { return SCHEMA_FILE_NAME; @@ -1618,6 +1627,43 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS } @Test + public void testLookupJoin() throws Exception { + + Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH); + addSchema(lookupTableSchema); + TableConfig tableConfig = createTableConfig(DIM_TABLE_TABLE_CONFIG_PATH); + TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(), getServerTenant(), null); + tableConfig.setTenantConfig(tenantConfig); + addTableConfig(tableConfig); + createAndUploadSegmentFromFile(tableConfig, lookupTableSchema, DIM_TABLE_DATA_PATH, FileFormat.CSV, + DIM_NUMBER_OF_RECORDS, 60_000); + + // Compare total rows in the primary table with number of rows in the result of the join with lookup table + String query = "select count(*) from " + getTableName(); + JsonNode jsonNode = postQuery(query); + long totalRowsInTable = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(); + + query = "select /*+ joinOptions(join_strategy='lookup') */ AirlineID, DayOfWeek, dayName from " + getTableName() + + " join daysOfWeek ON DayOfWeek = dayId where dayName in ('Monday', 'Tuesday', 'Wednesday')"; + jsonNode = postQuery(query); + long result = jsonNode.get("resultTable").get("rows").size(); + assertTrue(result > 0); + assertTrue(result < totalRowsInTable); + + // Verify that LOOKUP_JOIN stage is present and HASH_JOIN stage is not present in the query plan + Set<String> stages = new HashSet<>(); + JsonNode currentNode = jsonNode.get("stageStats").get("children"); + while (currentNode != null) { + currentNode = currentNode.get(0); + stages.add(currentNode.get("type").asText()); + currentNode = currentNode.get("children"); + } + assertTrue(stages.contains("LOOKUP_JOIN"), "Could not find LOOKUP_JOIN stage in the query plan"); + assertFalse(stages.contains("HASH_JOIN"), "HASH_JOIN stage should not be present in the query plan"); + + dropOfflineTable(tableConfig.getTableName()); + } + public void testSearchLiteralFilter() throws Exception { String sqlQuery = "WITH CTE_B AS (SELECT 1692057600000 AS __ts FROM mytable GROUP BY __ts) SELECT 1692057600000 AS __ts FROM " diff --git a/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json new file mode 100644 index 0000000000..20f4f0d303 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json @@ -0,0 +1,18 @@ +{ + "tableName": "daysOfWeek", + "tableType": "OFFLINE", + "isDimTable": true, + "segmentsConfig": { + "segmentPushType": "REFRESH", + "replication": "1" + }, + "tenants": { + }, + "tableIndexConfig": { + "loadMode": "MMAP" + }, + "metadata": { + "customConfigs": { + } + } +} diff --git a/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv new file mode 100644 index 0000000000..a3ed71ffd4 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv @@ -0,0 +1,8 @@ +dayId,dayName +1,"Sunday" +2,"Monday" +3,"Tuesday" +4,"Wednesday" +5,"Thursday" +6,"Friday" +7,"Saturday" \ No newline at end of file diff --git a/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json new file mode 100644 index 0000000000..967faa81f7 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json @@ -0,0 +1,16 @@ +{ + "dimensionFieldSpecs": [ + { + "dataType": "INT", + "name": "dayId" + }, + { + "dataType": "STRING", + "name": "dayName" + } + ], + "schemaName": "daysOfWeek", + "primaryKeyColumns": [ + "dayId" + ] +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org