This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 81e858b  Adding integration test for null handling from realtime 
source (#5870)
81e858b is described below

commit 81e858b8f661e229e265ffc86d079da94f692225
Author: icefury71 <chinmay.cere...@gmail.com>
AuthorDate: Mon Aug 17 10:46:03 2020 -0700

    Adding integration test for null handling from realtime source (#5870)
    
    Adding an end-end integration test for creating a Pinot table with Kafka 
records including null values and running queries against this table (with the 
IS NOT NULL predicate). Related to #4230
---
 .../tests/BaseClusterIntegrationTest.java          |  10 +-
 .../tests/NullHandlingIntegrationTest.java         | 145 +++++++++++++++++++++
 .../src/test/resources/avro_data_with_nulls.tar.gz | Bin 0 -> 2973 bytes
 .../src/test/resources/test_null_handling.schema   |  33 +++++
 .../spi/utils/builder/TableConfigBuilder.java      |   7 +
 5 files changed, 193 insertions(+), 2 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 6fd5065..f80ed63 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -80,6 +80,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = 
Arrays.asList("FlightNum", "Origin");
   private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = 
Collections.singletonList("Origin");
   protected static final int DEFAULT_NUM_REPLICAS = 1;
+  protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), 
getClass().getSimpleName());
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
@@ -241,6 +242,10 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     return null;
   }
 
+  protected boolean getNullHandlingEnabled() {
+    return DEFAULT_NULL_HANDLING_ENABLED;
+  }
+
   /**
    * The following methods are based on the getters. Override the getters for 
non-default settings before calling these
    * methods.
@@ -273,7 +278,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
         
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
         
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
-        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).build();
+        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        .setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
   /**
@@ -333,7 +339,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
         
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
         
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(useLlc)
-        .setStreamConfigs(streamConfigs).build();
+        
.setStreamConfigs(streamConfigs).setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
   /**
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
new file mode 100644
index 0000000..ae0784d
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * Integration test that creates a Kafka broker, creates a Pinot cluster that 
consumes from Kafka and queries Pinot.
+ * The data pushed to Kafka includes null values.
+ */
+public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet 
{
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config
+    addSchema(createSchema());
+    addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(10_000L);
+  }
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "avro_data_with_nulls.tar.gz";
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "test_null_handling.schema";
+  }
+
+  @Override
+  @Nullable
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  protected List<String> getNoDictionaryColumns() {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Override
+  @Nullable
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @Override
+  protected boolean getNullHandlingEnabled() {
+    return true;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 100;
+  }
+
+  @Test
+  public void testTotalCount()
+      throws Exception {
+    String query = "SELECT count(*) FROM " + getTableName();
+    testQuery(query, Collections.singletonList(query));
+  }
+
+  @Test
+  public void testCountWithNullDescription()
+          throws Exception {
+    String query = "SELECT count(*) FROM " + getTableName() + " where 
description IS NOT NULL";
+    testQuery(query, Collections.singletonList(query));
+  }
+
+  @Test
+  public void testCountWithNullDescriptionAndSalary()
+          throws Exception {
+    String query = "SELECT count(*) FROM " + getTableName() + " where 
description IS NOT NULL AND salary IS NOT NULL";
+    testQuery(query, Collections.singletonList(query));
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz 
b/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz
new file mode 100644
index 0000000..2014c97
Binary files /dev/null and 
b/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz differ
diff --git 
a/pinot-integration-tests/src/test/resources/test_null_handling.schema 
b/pinot-integration-tests/src/test/resources/test_null_handling.schema
new file mode 100644
index 0000000..2113389
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/test_null_handling.schema
@@ -0,0 +1,33 @@
+{
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "singleValueField": true,
+      "name": "clientId"
+    },
+    {
+      "dataType": "STRING",
+      "singleValueField": true,
+      "name": "city"
+    },
+    {
+      "dataType": "STRING",
+      "singleValueField": true,
+      "name": "description"
+    },
+    {
+      "dataType": "INT",
+      "singleValueField": true,
+      "name": "salary"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "timeType": "DAYS",
+      "dataType": "INT",
+      "name": "DaysSinceEpoch"
+    }
+  },
+  "schemaName": "mytable"
+}
+
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 9149b9c..3891bf9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -88,6 +88,7 @@ public class TableConfigBuilder {
   private List<String> _rangeIndexColumns;
   private Map<String, String> _streamConfigs;
   private SegmentPartitionConfig _segmentPartitionConfig;
+  private boolean _nullHandlingEnabled;
 
   private TableCustomConfig _customConfig;
   private QuotaConfig _quotaConfig;
@@ -257,6 +258,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setNullHandlingEnabled(boolean 
nullHandlingEnabled) {
+    _nullHandlingEnabled = nullHandlingEnabled;
+    return this;
+  }
+
   public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
     _customConfig = customConfig;
     return this;
@@ -351,6 +357,7 @@ public class TableConfigBuilder {
     indexingConfig.setRangeIndexColumns(_rangeIndexColumns);
     indexingConfig.setStreamConfigs(_streamConfigs);
     indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
+    indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
 
     if (_customConfig == null) {
       _customConfig = new TableCustomConfig(null);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to