This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ab5a13637 Add integration test for lookup join in the multi-stage 
engine (#15244)
7ab5a13637 is described below

commit 7ab5a1363769259fd12d9ca8c311151121f0a679
Author: Krishan Goyal <kris...@startree.ai>
AuthorDate: Tue Mar 18 22:31:33 2025 +0530

    Add integration test for lookup join in the multi-stage engine (#15244)
---
 .../tests/BaseClusterIntegrationTest.java          | 24 +++++++++++
 .../tests/ClusterIntegrationTestUtils.java         | 10 ++++-
 .../tests/MultiStageEngineIntegrationTest.java     | 46 ++++++++++++++++++++++
 .../src/test/resources/dimDayOfWeek_config.json    | 18 +++++++++
 .../src/test/resources/dimDayOfWeek_data.csv       |  8 ++++
 .../src/test/resources/dimDayOfWeek_schema.json    | 16 ++++++++
 6 files changed, 121 insertions(+), 1 deletion(-)

diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 7b59e397d9..dac9873c62 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.time.Duration;
@@ -58,6 +59,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -279,6 +281,13 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     return Schema.fromInputStream(new FileInputStream(schemaFile));
   }
 
+  protected TableConfig createTableConfig(String tableConfigFileName)
+      throws IOException {
+    URL configPathUrl = 
getClass().getClassLoader().getResource(tableConfigFileName);
+    Assert.assertNotNull(configPathUrl);
+    return createTableConfig(new File(configPathUrl.getFile()));
+  }
+
   protected TableConfig createTableConfig(File tableConfigFile)
       throws IOException {
     InputStream inputStream = new FileInputStream(tableConfigFile);
@@ -600,6 +609,21 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     return false;
   }
 
+  protected void createAndUploadSegmentFromFile(TableConfig tableConfig, 
Schema schema, String dataFilePath,
+      FileFormat fileFormat, long expectedNoOfDocs, long timeoutMs) throws 
Exception {
+    URL dataPathUrl = getClass().getClassLoader().getResource(dataFilePath);
+    assert dataPathUrl != null;
+    File file = new File(dataPathUrl.getFile());
+
+    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+    ClusterIntegrationTestUtils.buildSegmentFromFile(file, tableConfig, 
schema, "%", _segmentDir, _tarDir, fileFormat);
+    uploadSegments(tableConfig.getTableName(), _tarDir);
+
+    TestUtils.waitForCondition(() -> 
getCurrentCountStarResult(tableConfig.getTableName()) == expectedNoOfDocs, 100L,
+        timeoutMs, "Failed to load " + expectedNoOfDocs + " documents in table 
" + tableConfig.getTableName(),
+        true, Duration.ofMillis(timeoutMs / 10));
+  }
+
   protected List<File> getAllAvroFiles()
       throws Exception {
     // Unpack the Avro files
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 2a73c93994..e53c2ce300 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -75,6 +75,7 @@ import 
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -341,8 +342,15 @@ public class ClusterIntegrationTestUtils {
   public static void buildSegmentFromAvro(File avroFile, TableConfig 
tableConfig,
       org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File 
segmentDir, File tarDir)
       throws Exception {
+    buildSegmentFromFile(avroFile, tableConfig, schema, segmentNamePostfix, 
segmentDir, tarDir, FileFormat.AVRO);
+  }
+
+  public static void buildSegmentFromFile(File file, TableConfig tableConfig, 
org.apache.pinot.spi.data.Schema schema,
+      String segmentNamePostfix, File segmentDir, File tarDir, FileFormat 
fileFormat)
+      throws Exception {
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
-    segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
+    segmentGeneratorConfig.setFormat(fileFormat);
+    segmentGeneratorConfig.setInputFilePath(file.getPath());
     segmentGeneratorConfig.setOutDir(segmentDir.getPath());
     segmentGeneratorConfig.setTableName(tableConfig.getTableName());
     segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index c295efe49a..7b75a681d0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -29,9 +29,11 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -44,11 +46,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
@@ -75,6 +79,11 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." + 
DEFAULT_TABLE_NAME;
   private String _tableName = DEFAULT_TABLE_NAME;
 
+  private static final String DIM_TABLE_DATA_PATH = "dimDayOfWeek_data.csv";
+  private static final String DIM_TABLE_SCHEMA_PATH = 
"dimDayOfWeek_schema.json";
+  private static final String DIM_TABLE_TABLE_CONFIG_PATH = 
"dimDayOfWeek_config.json";
+  private static final Integer DIM_NUMBER_OF_RECORDS = 7;
+
   @Override
   protected String getSchemaFileName() {
     return SCHEMA_FILE_NAME;
@@ -1618,6 +1627,43 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
+  public void testLookupJoin() throws Exception {
+
+    Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
+    addSchema(lookupTableSchema);
+    TableConfig tableConfig = createTableConfig(DIM_TABLE_TABLE_CONFIG_PATH);
+    TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(), 
getServerTenant(), null);
+    tableConfig.setTenantConfig(tenantConfig);
+    addTableConfig(tableConfig);
+    createAndUploadSegmentFromFile(tableConfig, lookupTableSchema, 
DIM_TABLE_DATA_PATH, FileFormat.CSV,
+        DIM_NUMBER_OF_RECORDS, 60_000);
+
+    // Compare total rows in the primary table with number of rows in the 
result of the join with lookup table
+    String query = "select count(*) from " + getTableName();
+    JsonNode jsonNode = postQuery(query);
+    long totalRowsInTable = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
+    query = "select /*+ joinOptions(join_strategy='lookup') */ AirlineID, 
DayOfWeek, dayName from " + getTableName()
+        + " join daysOfWeek ON DayOfWeek = dayId where dayName in ('Monday', 
'Tuesday', 'Wednesday')";
+    jsonNode = postQuery(query);
+    long result = jsonNode.get("resultTable").get("rows").size();
+    assertTrue(result > 0);
+    assertTrue(result < totalRowsInTable);
+
+    // Verify that LOOKUP_JOIN stage is present and HASH_JOIN stage is not 
present in the query plan
+    Set<String> stages = new HashSet<>();
+    JsonNode currentNode = jsonNode.get("stageStats").get("children");
+    while (currentNode != null) {
+      currentNode = currentNode.get(0);
+      stages.add(currentNode.get("type").asText());
+      currentNode = currentNode.get("children");
+    }
+    assertTrue(stages.contains("LOOKUP_JOIN"), "Could not find LOOKUP_JOIN 
stage in the query plan");
+    assertFalse(stages.contains("HASH_JOIN"), "HASH_JOIN stage should not be 
present in the query plan");
+
+    dropOfflineTable(tableConfig.getTableName());
+  }
+
   public void testSearchLiteralFilter() throws Exception {
     String sqlQuery =
         "WITH CTE_B AS (SELECT 1692057600000 AS __ts FROM mytable GROUP BY 
__ts) SELECT 1692057600000 AS __ts FROM "
diff --git 
a/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json 
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json
new file mode 100644
index 0000000000..20f4f0d303
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json
@@ -0,0 +1,18 @@
+{
+  "tableName": "daysOfWeek",
+  "tableType": "OFFLINE",
+  "isDimTable": true,
+  "segmentsConfig": {
+    "segmentPushType": "REFRESH",
+    "replication": "1"
+  },
+  "tenants": {
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP"
+  },
+  "metadata": {
+    "customConfigs": {
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv 
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv
new file mode 100644
index 0000000000..a3ed71ffd4
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv
@@ -0,0 +1,8 @@
+dayId,dayName
+1,"Sunday"
+2,"Monday"
+3,"Tuesday"
+4,"Wednesday"
+5,"Thursday"
+6,"Friday"
+7,"Saturday"
\ No newline at end of file
diff --git 
a/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json 
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json
new file mode 100644
index 0000000000..967faa81f7
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json
@@ -0,0 +1,16 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "dayId"
+    },
+    {
+      "dataType": "STRING",
+      "name": "dayName"
+    }
+  ],
+  "schemaName": "daysOfWeek",
+  "primaryKeyColumns": [
+    "dayId"
+  ]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to