This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 81e858b Adding integration test for null handling from realtime source (#5870) 81e858b is described below commit 81e858b8f661e229e265ffc86d079da94f692225 Author: icefury71 <chinmay.cere...@gmail.com> AuthorDate: Mon Aug 17 10:46:03 2020 -0700 Adding integration test for null handling from realtime source (#5870) Adding an end-end integration test for creating a Pinot table with Kafka records including null values and running queries against this table (with the IS NOT NULL predicate). Related to #4230 --- .../tests/BaseClusterIntegrationTest.java | 10 +- .../tests/NullHandlingIntegrationTest.java | 145 +++++++++++++++++++++ .../src/test/resources/avro_data_with_nulls.tar.gz | Bin 0 -> 2973 bytes .../src/test/resources/test_null_handling.schema | 33 +++++ .../spi/utils/builder/TableConfigBuilder.java | 7 + 5 files changed, 193 insertions(+), 2 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 6fd5065..f80ed63 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -80,6 +80,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin"); private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin"); protected static final int DEFAULT_NUM_REPLICAS = 1; + protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false; protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()); protected final File _segmentDir = new File(_tempDir, "segmentDir"); @@ -241,6 +242,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { return null; } + protected boolean getNullHandlingEnabled() { + return DEFAULT_NULL_HANDLING_ENABLED; + } + /** * The following methods are based on the getters. Override the getters for non-default settings before calling these * methods. @@ -273,7 +278,8 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()) - .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).build(); + .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) + .setNullHandlingEnabled(getNullHandlingEnabled()).build(); } /** @@ -333,7 +339,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()) .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(useLlc) - .setStreamConfigs(streamConfigs).build(); + .setStreamConfigs(streamConfigs).setNullHandlingEnabled(getNullHandlingEnabled()).build(); } /** diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java new file mode 100644 index 0000000..ae0784d --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import org.apache.commons.io.FileUtils; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + + +/** + * Integration test that creates a Kafka broker, creates a Pinot cluster that consumes from Kafka and queries Pinot. + * The data pushed to Kafka includes null values. + */ +public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + + // Start Kafka + startKafka(); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload the schema and table config + addSchema(createSchema()); + addTableConfig(createRealtimeTableConfig(avroFiles.get(0))); + + // Push data into Kafka + pushAvroIntoKafka(avroFiles); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + // Wait for all documents loaded + waitForAllDocsLoaded(10_000L); + } + + @Override + protected String getAvroTarFileName() { + return "avro_data_with_nulls.tar.gz"; + } + + @Override + protected String getSchemaFileName() { + return "test_null_handling.schema"; + } + + @Override + @Nullable + protected String getSortedColumn() { + return null; + } + + @Override + @Nullable + protected List<String> getInvertedIndexColumns() { + return null; + } + + @Override + @Nullable + protected List<String> getNoDictionaryColumns() { + return null; + } + + @Override + @Nullable + protected List<String> getRangeIndexColumns() { + return null; + } + + @Override + @Nullable + protected List<String> getBloomFilterColumns() { + return null; + } + + @Override + protected boolean getNullHandlingEnabled() { + return true; + } + + @Override + protected long getCountStarResult() { + return 100; + } + + @Test + public void testTotalCount() + throws Exception { + String query = "SELECT count(*) FROM " + getTableName(); + testQuery(query, Collections.singletonList(query)); + } + + @Test + public void testCountWithNullDescription() + throws Exception { + String query = "SELECT count(*) FROM " + getTableName() + " where description IS NOT NULL"; + testQuery(query, Collections.singletonList(query)); + } + + @Test + public void testCountWithNullDescriptionAndSalary() + throws Exception { + String query = "SELECT count(*) FROM " + getTableName() + " where description IS NOT NULL AND salary IS NOT NULL"; + testQuery(query, Collections.singletonList(query)); + } +} diff --git a/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz b/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz new file mode 100644 index 0000000..2014c97 Binary files /dev/null and b/pinot-integration-tests/src/test/resources/avro_data_with_nulls.tar.gz differ diff --git a/pinot-integration-tests/src/test/resources/test_null_handling.schema b/pinot-integration-tests/src/test/resources/test_null_handling.schema new file mode 100644 index 0000000..2113389 --- /dev/null +++ b/pinot-integration-tests/src/test/resources/test_null_handling.schema @@ -0,0 +1,33 @@ +{ + "dimensionFieldSpecs": [ + { + "dataType": "INT", + "singleValueField": true, + "name": "clientId" + }, + { + "dataType": "STRING", + "singleValueField": true, + "name": "city" + }, + { + "dataType": "STRING", + "singleValueField": true, + "name": "description" + }, + { + "dataType": "INT", + "singleValueField": true, + "name": "salary" + } + ], + "timeFieldSpec": { + "incomingGranularitySpec": { + "timeType": "DAYS", + "dataType": "INT", + "name": "DaysSinceEpoch" + } + }, + "schemaName": "mytable" +} + diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index 9149b9c..3891bf9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -88,6 +88,7 @@ public class TableConfigBuilder { private List<String> _rangeIndexColumns; private Map<String, String> _streamConfigs; private SegmentPartitionConfig _segmentPartitionConfig; + private boolean _nullHandlingEnabled; private TableCustomConfig _customConfig; private QuotaConfig _quotaConfig; @@ -257,6 +258,11 @@ public class TableConfigBuilder { return this; } + public TableConfigBuilder setNullHandlingEnabled(boolean nullHandlingEnabled) { + _nullHandlingEnabled = nullHandlingEnabled; + return this; + } + public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) { _customConfig = customConfig; return this; @@ -351,6 +357,7 @@ public class TableConfigBuilder { indexingConfig.setRangeIndexColumns(_rangeIndexColumns); indexingConfig.setStreamConfigs(_streamConfigs); indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig); + indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled); if (_customConfig == null) { _customConfig = new TableCustomConfig(null); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org