This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8f117b9224b Fix RefreshSegmentMinionClusterIntegrationTest.checkColumnAddition() (#16125) 8f117b9224b is described below commit 8f117b9224b1a019087655d75c40e20ce3adcf46 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Jun 17 14:08:13 2025 -0600 Fix RefreshSegmentMinionClusterIntegrationTest.checkColumnAddition() (#16125) --- ...RefreshSegmentMinionClusterIntegrationTest.java | 79 ++++++---------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java index 8f76ce7f6eb..c0921838682 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -20,12 +20,12 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; import java.io.File; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import org.apache.commons.io.FileUtils; import org.apache.helix.task.TaskState; @@ -36,17 +36,14 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.segment.spi.index.StandardIndexes; -import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -274,78 +271,40 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg @Test(priority = 4) public void checkColumnAddition() throws Exception { - long numTotalDocs = getCountStarResult(); Schema schema = createSchema(); - schema.addField(new MetricFieldSpec("NewAddedIntMetric", FieldSpec.DataType.INT, 1)); - schema.addField(new MetricFieldSpec("NewAddedLongMetric", FieldSpec.DataType.LONG, 1)); - schema.addField(new MetricFieldSpec("NewAddedFloatMetric", FieldSpec.DataType.FLOAT)); - schema.addField(new MetricFieldSpec("NewAddedDoubleMetric", FieldSpec.DataType.DOUBLE)); - schema.addField(new MetricFieldSpec("NewAddedBigDecimalMetric", FieldSpec.DataType.BIG_DECIMAL)); - schema.addField(new MetricFieldSpec("NewAddedBytesMetric", FieldSpec.DataType.BYTES)); - schema.addField(new DimensionFieldSpec("NewAddedMVIntDimension", FieldSpec.DataType.INT, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVLongDimension", FieldSpec.DataType.LONG, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVFloatDimension", FieldSpec.DataType.FLOAT, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVDoubleDimension", FieldSpec.DataType.DOUBLE, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVBooleanDimension", FieldSpec.DataType.BOOLEAN, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVTimestampDimension", FieldSpec.DataType.TIMESTAMP, false)); - schema.addField(new DimensionFieldSpec("NewAddedMVStringDimension", FieldSpec.DataType.STRING, false)); - schema.addField(new DimensionFieldSpec("NewAddedSVJSONDimension", FieldSpec.DataType.JSON, true)); - schema.addField(new DimensionFieldSpec("NewAddedSVBytesDimension", FieldSpec.DataType.BYTES, true)); - schema.addField( - new DateTimeFieldSpec("NewAddedDerivedHoursSinceEpoch", FieldSpec.DataType.INT, "EPOCH|HOURS", "1:DAYS")); - schema.addField( - new DateTimeFieldSpec("NewAddedDerivedTimestamp", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS")); - schema.addField(new DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", FieldSpec.DataType.BOOLEAN, true)); - schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", FieldSpec.DataType.STRING, false)); schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs", FieldSpec.DataType.INT, false)); schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString", FieldSpec.DataType.STRING, false)); schema.addField(new DimensionFieldSpec("NewAddedRawDerivedStringDimension", FieldSpec.DataType.STRING, true)); schema.addField(new DimensionFieldSpec("NewAddedRawDerivedMVIntDimension", FieldSpec.DataType.INT, false)); - schema.addField(new DimensionFieldSpec("NewAddedDerivedMVDoubleDimension", FieldSpec.DataType.DOUBLE, false)); schema.addField(new DimensionFieldSpec("NewAddedDerivedNullString", FieldSpec.DataType.STRING, true, "nil")); schema.setEnableColumnBasedNullHandling(true); - addSchema(schema); + updateSchema(schema); TableConfig tableConfig = getOfflineTableConfig(); List<TransformConfig> transformConfigs = - Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"), - new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"), - new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"), - new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"), - new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"), + List.of(new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"), new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"), new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)"), new TransformConfig("NewAddedRawDerivedMVIntDimension", "ActualElapsedTime"), - new TransformConfig("NewAddedDerivedMVDoubleDimension", "ArrDelayMinutes"), new TransformConfig("NewAddedDerivedNullString", "caseWhen(true, null, null)")); - IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); tableConfig.setIngestionConfig(ingestionConfig); - - // Ensure that we can reload segments with a new raw derived column - tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedStringDimension"); - tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedMVIntDimension"); - List<FieldConfig> fieldConfigList = new ArrayList<>(); - fieldConfigList.add( - new FieldConfig("NewAddedDerivedDivAirportSeqIDs", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), - FieldConfig.CompressionCodec.MV_ENTRY_DICT, null)); - fieldConfigList.add(new FieldConfig("NewAddedDerivedDivAirportSeqIDsString", FieldConfig.EncodingType.DICTIONARY, - Collections.emptyList(), FieldConfig.CompressionCodec.MV_ENTRY_DICT, null)); + List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns(); + assertNotNull(noDictionaryColumns); + noDictionaryColumns.add("NewAddedRawDerivedStringDimension"); + noDictionaryColumns.add("NewAddedRawDerivedMVIntDimension"); updateTableConfig(tableConfig); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); - assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() - .setTablesToSchedule(Collections.singleton(offlineTableName))) + assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext().setTablesToSchedule(Set.of(offlineTableName))) .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task - MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext() - .setTablesToSchedule(Collections.singleton(offlineTableName)) - .setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)), - _taskManager); + MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Set.of(offlineTableName)) + .setTasksToSchedule(Set.of(MinionConstants.RefreshSegmentTask.TASK_TYPE)), _taskManager); waitForTaskToComplete(); // Check that metadata contains processed times. @@ -356,22 +315,24 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg assertTrue(customMap.containsKey(refreshKey)); } + String referenceQuery = "SELECT COUNT(*) FROM mytable WHERE ActualElapsedTime > 150"; + int referenceCount = postQuery(referenceQuery).get("resultTable").get("rows").get(0).get(0).asInt(); waitForServerSegmentDownload(aVoid -> { try { - String query = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric = 1"; + String query = "SELECT COUNT(*) FROM mytable WHERE NewAddedRawDerivedMVIntDimension > 150"; JsonNode response = postQuery(query); - return response.get("resultTable").get("rows").get(0).get(0).asLong() == numTotalDocs; + return response.get("resultTable").get("rows").get(0).get(0).asLong() == referenceCount; } catch (Exception e) { throw new RuntimeException(e); } }); // Verify the index sizes - JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest( - _controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), - List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDsString", - "NewAddedRawDerivedStringDimension", "NewAddedRawDerivedMVIntDimension", - "NewAddedDerivedNullString")))) + List<String> columns = + List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDsString", + "NewAddedRawDerivedStringDimension", "NewAddedRawDerivedMVIntDimension", "NewAddedDerivedNullString"); + JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode( + sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), columns))) .get("columnIndexSizeMap"); assertEquals(columnIndexSizeMap.size(), 6); JsonNode originalColumnIndexSizes = columnIndexSizeMap.get("DivAirportSeqIDs"); @@ -389,7 +350,7 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg assertTrue( derivedStringColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble() > originalColumnDictionarySize); - // Both derived columns should have smaller forward index size than the original column because of compression + // Both derived columns should have same forward index size double derivedColumnForwardIndexSize = derivedColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(); assertEquals(derivedStringColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(), derivedColumnForwardIndexSize); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org