This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch hotfix-minmax in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/hotfix-minmax by this push: new 5bbfd2b Fix NPE for aggregate metrics (#5862) 5bbfd2b is described below commit 5bbfd2bf643e0e4d5de94ba64adbaee309eed701 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Aug 13 21:56:25 2020 -0700 Fix NPE for aggregate metrics (#5862) For real-time table with aggregate metrics enabled, the metrics will have `null` min/max value in the column stats while creating the segment, which causes `NPE`. Added the null check to prevent the exception. Also added an integration test for the aggregate metrics --- .../creator/impl/SegmentColumnarIndexCreator.java | 25 ++-- .../AggregateMetricsClusterIntegrationTest.java | 130 +++++++++++++++++++++ 2 files changed, 138 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java index 34131ab..233391b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -532,30 +532,21 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); } - String minValue = columnIndexCreationInfo.getMin().toString(); - String maxValue = columnIndexCreationInfo.getMax().toString(); + // NOTE: Min/max could be null for real-time aggregate metrics. + Object min = columnIndexCreationInfo.getMin(); + Object max = columnIndexCreationInfo.getMax(); + if (min != null && max != null) { + addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString()); + } + String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString(); - if (dataType == DataType.STRING) { - // Check special characters for STRING column - if (isValidPropertyValue(minValue)) { - properties.setProperty(getKeyFor(column, MIN_VALUE), minValue); - } - if (isValidPropertyValue(maxValue)) { - properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue); - } - if (isValidPropertyValue(defaultNullValue)) { - properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue); - } - } else { - properties.setProperty(getKeyFor(column, MIN_VALUE), minValue); - properties.setProperty(getKeyFor(column, MAX_VALUE), maxValue); + if (isValidPropertyValue(defaultNullValue)) { properties.setProperty(getKeyFor(column, DEFAULT_NULL_VALUE), defaultNullValue); } } public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue, String maxValue) { - // Check special characters for STRING column if (isValidPropertyValue(minValue)) { properties.setProperty(getKeyFor(column, MIN_VALUE), minValue); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java new file mode 100644 index 0000000..4f6d80f --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Integration test that enables aggregate metrics for the LLC real-time table. + */ +public class AggregateMetricsClusterIntegrationTest extends BaseClusterIntegrationTestSet { + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + + // Start Kafka + startKafka(); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload the schema and table config with reduced number of columns and aggregate metrics on + Schema schema = + new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING) + .addSingleValueDimension("Origin", DataType.STRING).addMetric("AirTime", DataType.LONG) + .addMetric("ArrDelay", DataType.DOUBLE) + .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build(); + addSchema(schema); + TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0)); + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + indexingConfig.setSortedColumn(Collections.singletonList("Carrier")); + indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin")); + indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime", "ArrDelay")); + indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch")); + indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin")); + indexingConfig.setAggregateMetrics(true); + addTableConfig(tableConfig); + + // Push data into Kafka + pushAvroIntoKafka(avroFiles); + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Override + protected boolean useLlc() { + // NOTE: Aggregate metrics is only available with LLC. + return true; + } + + @Override + protected void waitForAllDocsLoaded(long timeoutMs) { + // NOTE: For aggregate metrics, we need to test the aggregation result instead of the document count because + // documents can be merged during ingestion. + String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable"; + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResult = postSqlQuery(sql, _brokerBaseApiUrl); + JsonNode aggregationResults = queryResult.get("resultTable").get("rows").get(0); + return aggregationResults.get(0).asInt() == -165429728 && aggregationResults.get(1).asInt() == -175625957; + } catch (Exception e) { + return null; + } + }, 100L, timeoutMs, "Failed to load all documents"); + } + + @Test + public void testQueries() + throws Exception { + String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable"; + testSqlQuery(sql, Collections.singletonList(sql)); + sql = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC"; + testSqlQuery(sql, Collections.singletonList(sql)); + sql = "SELECT Origin, SUM(ArrDelay) FROM mytable WHERE Carrier = 'AA' GROUP BY Origin ORDER BY Origin"; + testSqlQuery(sql, Collections.singletonList(sql)); + } + + @AfterClass + public void tearDown() + throws Exception { + dropRealtimeTable(getTableName()); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org