This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a9abd143f0 Minion Task to support automatic Segment Refresh (#14300) a9abd143f0 is described below commit a9abd143f0e0c8dee991239931d59f4db186093d Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Thu Nov 21 01:04:52 2024 +0530 Minion Task to support automatic Segment Refresh (#14300) * Minion Task to support automatic Segment Refresh * Address review comments * Address review comments. --- .../api/resources/PinotTableRestletResource.java | 6 +- .../helix/core/PinotHelixResourceManager.java | 16 +- .../apache/pinot/core/common/MinionConstants.java | 25 +- ...RefreshSegmentMinionClusterIntegrationTest.java | 463 +++++++++++++++++++++ .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 18 + .../refreshsegment/RefreshSegmentTaskExecutor.java | 209 ++++++++++ .../RefreshSegmentTaskExecutorFactory.java | 49 +++ .../RefreshSegmentTaskGenerator.java | 171 ++++++++ .../RefreshSegmentTaskProgressObserverFactory.java | 30 +- ...ableStats.java => TableStatsHumanReadable.java} | 6 +- 10 files changed, 961 insertions(+), 32 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 040c57c885..2ece7deb0d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -114,7 +114,7 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableStats; +import org.apache.pinot.spi.config.table.TableStatsHumanReadable; import org.apache.pinot.spi.config.table.TableStatus; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -883,13 +883,13 @@ public class PinotTableRestletResource { if ((tableTypeStr == null || TableType.OFFLINE.name().equalsIgnoreCase(tableTypeStr)) && _pinotHelixResourceManager.hasOfflineTable(tableName)) { String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName); - TableStats tableStats = _pinotHelixResourceManager.getTableStats(tableNameWithType); + TableStatsHumanReadable tableStats = _pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType); ret.set(TableType.OFFLINE.name(), JsonUtils.objectToJsonNode(tableStats)); } if ((tableTypeStr == null || TableType.REALTIME.name().equalsIgnoreCase(tableTypeStr)) && _pinotHelixResourceManager.hasRealtimeTable(tableName)) { String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); - TableStats tableStats = _pinotHelixResourceManager.getTableStats(tableNameWithType); + TableStatsHumanReadable tableStats = _pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType); ret.set(TableType.REALTIME.name(), JsonUtils.objectToJsonNode(tableStats)); } return ret.toString(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 099cf4b5e8..e7affa4287 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -160,7 +160,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableStats; +import org.apache.pinot.spi.config.table.TableStatsHumanReadable; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TagOverrideConfig; import org.apache.pinot.spi.config.table.TenantConfig; @@ -4240,12 +4240,22 @@ public class PinotHelixResourceManager { return onlineSegments; } - public TableStats getTableStats(String tableNameWithType) { + public TableStatsHumanReadable getTableStatsHumanReadable(String tableNameWithType) { String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType); Stat stat = _propertyStore.getStat(zkPath, AccessOption.PERSISTENT); Preconditions.checkState(stat != null, "Failed to read ZK stats for table: %s", tableNameWithType); String creationTime = SIMPLE_DATE_FORMAT.format(Instant.ofEpochMilli(stat.getCtime())); - return new TableStats(creationTime); + return new TableStatsHumanReadable(creationTime); + } + + public Stat getTableStat(String tableNameWithType) { + String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType); + return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT); + } + + public Stat getSchemaStat(String schemaName) { + String zkPath = ZKMetadataProvider.constructPropertyStorePathForSchema(schemaName); + return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 6ad497a4a3..26e0bd79ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -162,8 +162,29 @@ public class MinionConstants { // Generate segment and push to controller based on batch ingestion configs public static class SegmentGenerationAndPushTask { public static final String TASK_TYPE = "SegmentGenerationAndPushTask"; - public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE = - "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance"; + } + + /** + * Minion task to refresh segments when there are changes to tableConfigs and Schema. This task currently supports the + * following functionality: + * 1. Adding/Removing/Updating indexes. + * 2. Adding new columns (also supports transform configs for new columns). + * 3. Converting segment versions. + * 4. Compatible datatype changes to columns (Note that the minion task will fail if the data in the column is not + * compatible with target datatype) + * + * This is an alternative to performing reload of existing segments on Servers. The reload on servers is sub-optimal + * for many reasons: + * 1. Requires an explicit reload call when index configurations change. + * 2. Is very slow. Happens one (or few - configurable) segment at time to avoid query impact. + * 3. Compute price is paid on all servers hosting the segment.q + * 4. Increases server startup time as more and more segments require reload. + */ + public static class RefreshSegmentTask { + public static final String TASK_TYPE = "RefreshSegmentTask"; + + // Maximum number of tasks to create per table per run. + public static final int MAX_NUM_TASKS_PER_TABLE = 20; } public static class UpsertCompactionTask { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java new file mode 100644 index 0000000000..7f91a8671e --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + protected PinotHelixResourceManager _pinotHelixResourceManager; + protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir"); + protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir"); + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, _segmentTarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + startMinion(); + + // Create schema and tableConfig + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + tableConfig.setTaskConfig(getRefreshSegmentTaskConfig()); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + // Create segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDataDir, + _segmentTarDir); + uploadSegments(getTableName(), _segmentTarDir); + + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager(); + } + + @AfterClass + public void tearDown() throws Exception { + stopMinion(); + stopServer(); + stopBroker(); + stopController(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + @Test(priority = 1) + public void testFirstSegmentRefresh() { + // This will create the inverted index as we disable inverted index creation during segment push. + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check that metadata contains expected values + Map<String, String> segmentRefreshTime = new HashMap<>(); + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map<String, String> customMap = metadata.getCustomMap(); + assertTrue(customMap.containsKey(refreshKey)); + segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey)); + } + + // This should be no-op as nothing changes. + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map<String, String> customMap = metadata.getCustomMap(); + assertTrue( + customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); + assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey), + "Refresh Time doesn't match"); + } + } + + @Test(priority = 2) + public void testValidDatatypeChange() throws Exception { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + + // Change datatype from INT -> LONG for airlineId + deleteSchema(getTableName()); + Schema schema = createSchema(); + schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.LONG); + schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.STRING); + schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.FLOAT); + schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING); + addSchema(schema); + + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + waitForServerSegmentDownload(aVoid -> { + try { + String query = "SELECT ArrTime FROM mytable LIMIT 10"; + JsonNode response = postQuery(query); + return response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("LONG"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + waitForServerSegmentDownload(aVoid -> { + try { + String query = "SELECT AirlineID FROM mytable LIMIT 10"; + JsonNode response = postQuery(query); + return response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("STRING"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + waitForServerSegmentDownload(aVoid -> { + try { + String query = "SELECT ActualElapsedTime FROM mytable LIMIT 10"; + JsonNode response = postQuery(query); + return response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("FLOAT"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + waitForServerSegmentDownload(aVoid -> { + try { + String query = "SELECT DestAirportID FROM mytable LIMIT 10"; + JsonNode response = postQuery(query); + return response.get("resultTable").get("dataSchema").get("columnDataTypes").toString().contains("STRING"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Reset the schema back to it's original state. + deleteSchema(getTableName()); + schema = createSchema(); + schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.INT); + schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.LONG); + schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.INT); + schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.INT); + addSchema(schema); + } + + @Test(priority = 3) + public void testIndexChanges() throws Exception { + /** + * Adding bare-minimum tests for addition and removal of indexes. The segment generation code already + * has enough tests and testing each index addition/removal does not seem necessary. + */ + + // Current inverted index columns are "FlightNum", "Origin", "Quarter" + String query = "SELECT * FROM mytable WHERE flightNum = 3151 LIMIT 10"; + assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 0L); + query = "SELECT * from mytable where Origin = 'SFO' LIMIT 10"; + assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 0L); + query = "SELECT * from mytable where Quarter = 1 LIMIT 10"; + assertEquals(postQuery(query).get("numEntriesScannedInFilter").asLong(), 0L); + + TableConfig tableConfig = getOfflineTableConfig(); + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + // Add inverted index for DivActualElapsedTime + // Remove inverted index for "FlightNum" + indexingConfig.setInvertedIndexColumns(Arrays.asList("DivActualElapsedTime", "Origin", "Quarter")); + updateTableConfig(tableConfig); + + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + waitForServerSegmentDownload(aVoid -> { + try { + String newQuery = "SELECT * FROM mytable where flightNum = 3151 LIMIT 10"; + return postQuery(newQuery).get("numEntriesScannedInFilter").asLong() > 0; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + waitForServerSegmentDownload(aVoid -> { + try { + String newQuery = "SELECT * FROM mytable where DivActualElapsedTime = 305 LIMIT 10"; + return postQuery(newQuery).get("numEntriesScannedInFilter").asLong() == 0; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Test(priority = 4) + public void checkColumnAddition() throws Exception { + long numTotalDocs = getCountStarResult(); + Schema schema = createSchema(); + schema.addField(new MetricFieldSpec("NewAddedIntMetric", FieldSpec.DataType.INT, 1)); + schema.addField(new MetricFieldSpec("NewAddedLongMetric", FieldSpec.DataType.LONG, 1)); + schema.addField(new MetricFieldSpec("NewAddedFloatMetric", FieldSpec.DataType.FLOAT)); + schema.addField(new MetricFieldSpec("NewAddedDoubleMetric", FieldSpec.DataType.DOUBLE)); + schema.addField(new MetricFieldSpec("NewAddedBigDecimalMetric", FieldSpec.DataType.BIG_DECIMAL)); + schema.addField(new MetricFieldSpec("NewAddedBytesMetric", FieldSpec.DataType.BYTES)); + schema.addField(new DimensionFieldSpec("NewAddedMVIntDimension", FieldSpec.DataType.INT, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVLongDimension", FieldSpec.DataType.LONG, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVFloatDimension", FieldSpec.DataType.FLOAT, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVDoubleDimension", FieldSpec.DataType.DOUBLE, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVBooleanDimension", FieldSpec.DataType.BOOLEAN, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVTimestampDimension", FieldSpec.DataType.TIMESTAMP, false)); + schema.addField(new DimensionFieldSpec("NewAddedMVStringDimension", FieldSpec.DataType.STRING, false)); + schema.addField(new DimensionFieldSpec("NewAddedSVJSONDimension", FieldSpec.DataType.JSON, true)); + schema.addField(new DimensionFieldSpec("NewAddedSVBytesDimension", FieldSpec.DataType.BYTES, true)); + schema.addField( + new DateTimeFieldSpec("NewAddedDerivedHoursSinceEpoch", FieldSpec.DataType.INT, "EPOCH|HOURS", "1:DAYS")); + schema.addField( + new DateTimeFieldSpec("NewAddedDerivedTimestamp", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS")); + schema.addField(new DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", FieldSpec.DataType.BOOLEAN, true)); + schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", FieldSpec.DataType.STRING, false)); + schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs", FieldSpec.DataType.INT, false)); + schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString", FieldSpec.DataType.STRING, false)); + schema.addField(new DimensionFieldSpec("NewAddedRawDerivedStringDimension", FieldSpec.DataType.STRING, true)); + schema.addField(new DimensionFieldSpec("NewAddedRawDerivedMVIntDimension", FieldSpec.DataType.INT, false)); + schema.addField(new DimensionFieldSpec("NewAddedDerivedMVDoubleDimension", FieldSpec.DataType.DOUBLE, false)); + schema.addField(new DimensionFieldSpec("NewAddedDerivedNullString", FieldSpec.DataType.STRING, true, "nil")); + schema.setEnableColumnBasedNullHandling(true); + addSchema(schema); + + TableConfig tableConfig = getOfflineTableConfig(); + List<TransformConfig> transformConfigs = + Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"), + new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"), + new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"), + new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"), + new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"), + new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"), + new TransformConfig("NewAddedRawDerivedStringDimension", "reverse(DestCityName)"), + new TransformConfig("NewAddedRawDerivedMVIntDimension", "ActualElapsedTime"), + new TransformConfig("NewAddedDerivedMVDoubleDimension", "ArrDelayMinutes"), + new TransformConfig("NewAddedDerivedNullString", "caseWhen(true, null, null)")); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setTransformConfigs(transformConfigs); + tableConfig.setIngestionConfig(ingestionConfig); + + // Ensure that we can reload segments with a new raw derived column + tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedStringDimension"); + tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedMVIntDimension"); + List<FieldConfig> fieldConfigList = new ArrayList<>(); + fieldConfigList.add( + new FieldConfig("NewAddedDerivedDivAirportSeqIDs", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + FieldConfig.CompressionCodec.MV_ENTRY_DICT, null)); + fieldConfigList.add(new FieldConfig("NewAddedDerivedDivAirportSeqIDsString", FieldConfig.EncodingType.DICTIONARY, + Collections.emptyList(), FieldConfig.CompressionCodec.MV_ENTRY_DICT, null)); + updateTableConfig(tableConfig); + + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check that metadata contains processed times. + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map<String, String> customMap = metadata.getCustomMap(); + assertTrue(customMap.containsKey(refreshKey)); + } + + waitForServerSegmentDownload(aVoid -> { + try { + String query = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric = 1"; + JsonNode response = postQuery(query); + return response.get("resultTable").get("rows").get(0).get(0).asLong() == numTotalDocs; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Verify the index sizes + JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest( + _controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), + List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDsString", + "NewAddedRawDerivedStringDimension", "NewAddedRawDerivedMVIntDimension", + "NewAddedDerivedNullString")))) + .get("columnIndexSizeMap"); + assertEquals(columnIndexSizeMap.size(), 6); + JsonNode originalColumnIndexSizes = columnIndexSizeMap.get("DivAirportSeqIDs"); + JsonNode derivedColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDs"); + JsonNode derivedStringColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDsString"); + JsonNode derivedRawStringColumnIndex = columnIndexSizeMap.get("NewAddedRawDerivedStringDimension"); + JsonNode derivedRawMVIntColumnIndex = columnIndexSizeMap.get("NewAddedRawDerivedMVIntDimension"); + JsonNode derivedNullStringColumnIndex = columnIndexSizeMap.get("NewAddedDerivedNullString"); + + // Derived int column should have the same dictionary size as the original column + double originalColumnDictionarySize = originalColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble(); + assertEquals(derivedColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble(), originalColumnDictionarySize); + + // Derived string column should have larger dictionary size than the original column + assertTrue( + derivedStringColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble() > originalColumnDictionarySize); + + // Both derived columns should have smaller forward index size than the original column because of compression + double derivedColumnForwardIndexSize = derivedColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(); + assertEquals(derivedStringColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(), + derivedColumnForwardIndexSize); + + assertTrue(derivedRawStringColumnIndex.has(StandardIndexes.FORWARD_ID)); + assertFalse(derivedRawStringColumnIndex.has(StandardIndexes.DICTIONARY_ID)); + + assertTrue(derivedRawMVIntColumnIndex.has(StandardIndexes.FORWARD_ID)); + assertFalse(derivedRawMVIntColumnIndex.has(StandardIndexes.DICTIONARY_ID)); + + assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID)); + } + + @Test(priority = 5) + public void checkRefreshNotNecessary() throws Exception { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + + Map<String, Long> segmentCrc = new HashMap<>(); + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + segmentCrc.put(metadata.getSegmentName(), metadata.getCrc()); + } + + TableConfig tableConfig = getOfflineTableConfig(); + tableConfig.setQuotaConfig(new QuotaConfig(null, "10")); + + updateTableConfig(tableConfig); + + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check that metadata contains expected values + Map<String, String> segmentRefreshTime = new HashMap<>(); + + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map<String, String> customMap = metadata.getCustomMap(); + assertTrue(customMap.containsKey(refreshKey)); + segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey)); + assertEquals(segmentCrc.get(metadata.getSegmentName()), metadata.getCrc(), "CRC does not match"); + } + + // This should be no-op as nothing changes. + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map<String, String> customMap = metadata.getCustomMap(); + assertTrue( + customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); + assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey), + "Refresh Time doesn't match"); + } + } + + protected void waitForTaskToComplete() { + TestUtils.waitForCondition(input -> { + // Check task state + for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.RefreshSegmentTask.TASK_TYPE) + .values()) { + if (taskState != TaskState.COMPLETED) { + return false; + } + } + return true; + }, 600_000L, "Failed to complete task"); + } + + protected void waitForServerSegmentDownload(Function<Void, Boolean> conditionFunc) { + TestUtils.waitForCondition(aVoid -> { + boolean val = conditionFunc.apply(aVoid); + return val; + }, 60_000L, "Failed to meet condition"); + } + + private TableTaskConfig getRefreshSegmentTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + return new TableTaskConfig( + Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, tableTaskConfigs)); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 55dfb97f98..5e41720cde 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -19,10 +19,14 @@ package org.apache.pinot.plugin.minion.tasks; import java.net.URI; +import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; import javax.annotation.Nullable; import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; @@ -54,6 +58,9 @@ public class MinionTaskUtils { private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; + public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + public static final String UTC = "UTC"; + private MinionTaskUtils() { } @@ -235,4 +242,15 @@ public class MinionTaskUtils { } return validDocIds; } + + public static String toUTCString(long epochMillis) { + Date date = new Date(epochMillis); + SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN); + isoFormat.setTimeZone(TimeZone.getTimeZone(UTC)); + return isoFormat.format(date); + } + + public static long fromUTCString(String utcString) { + return Instant.parse(utcString).toEpochMilli(); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java new file mode 100644 index 0000000000..2509ba3721 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.refreshsegment; + +import java.io.File; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; +import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RefreshSegmentTaskExecutor extends BaseSingleSegmentConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class); + + private long _taskStartTime; + + /** + * The code here currently covers segment refresh for the following cases: + * 1. Process newly added columns. + * 2. Addition/removal of indexes. + * 3. Compatible datatype change for existing columns + */ + @Override + protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) + throws Exception { + _eventObserver.notifyProgress(pinotTaskConfig, "Refreshing segment: " + indexDir); + + // We set _taskStartTime before fetching the tableConfig. Task Generation relies on tableConfig/Schema updates + // happening after the last processed time. So we explicity use the timestamp before fetching tableConfig as the + // processedTime. + _taskStartTime = System.currentTimeMillis(); + Map<String, String> configs = pinotTaskConfig.getConfigs(); + String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); + String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY); + String taskType = pinotTaskConfig.getTaskType(); + + LOGGER.info("Starting task: {} with configs: {}", taskType, configs); + + TableConfig tableConfig = getTableConfig(tableNameWithType); + Schema schema = getSchema(tableNameWithType); + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + PinotConfiguration segmentDirectoryConfigs = indexLoadingConfig.getSegmentDirectoryConfigs(); + SegmentDirectoryLoaderContext segmentLoaderContext = + new SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig()) + .setSchema(schema) + .setInstanceId(indexLoadingConfig.getInstanceId()) + .setSegmentName(segmentMetadata.getName()) + .setSegmentCrc(segmentMetadata.getCrc()) + .setSegmentDirectoryConfigs(segmentDirectoryConfigs) + .build(); + SegmentDirectory segmentDirectory = + SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(), segmentLoaderContext); + + // TODO: Instead of relying on needPreprocess(), process segment metadata file to determine if refresh is needed. + // BaseDefaultColumnHandler part of needPreprocess() does not process any changes to existing columns like datatype, + // change from dimension to metric, etc. + boolean needPreprocess = ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema); + closeSegmentDirectoryQuietly(segmentDirectory); + Set<String> refreshColumnSet = new HashSet<>(); + + for (FieldSpec fieldSpecInSchema : schema.getAllFieldSpecs()) { + // Virtual columns are constructed while loading the segment, thus do not exist in the record, nor should be + // persisted to the disk. + if (fieldSpecInSchema.isVirtualColumn()) { + continue; + } + + String column = fieldSpecInSchema.getName(); + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + if (columnMetadata != null) { + FieldSpec fieldSpecInSegment = columnMetadata.getFieldSpec(); + + // Check the data type and default value matches. + FieldSpec.DataType dataTypeInSegment = fieldSpecInSegment.getDataType(); + FieldSpec.DataType dataTypeInSchema = fieldSpecInSchema.getDataType(); + + // Column exists in segment. + if (dataTypeInSegment != dataTypeInSchema) { + // Check if we need to update the data-type. DataType change is dependent on segmentGeneration code converting + // the object to the destination datatype. If the existing data is the column is not compatible with the + // destination data-type, the refresh task will fail. + refreshColumnSet.add(column); + } + + // TODO: Maybe we can support singleValue to multi-value conversions are supproted and vice-versa. + } else { + refreshColumnSet.add(column); + } + } + + if (!needPreprocess && refreshColumnSet.isEmpty()) { + LOGGER.info("Skipping segment={}, table={} as it is up-to-date with new table/schema", segmentName, + tableNameWithType); + // We just need to update the ZK metadata with the last refresh time to avoid getting picked up again. As the CRC + // check will match, this will only end up being a ZK update. + return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType) + .setFile(indexDir) + .setSegmentName(segmentName) + .build(); + } + + // Refresh the segment. Segment reload is achieved by generating a new segment from scratch using the updated schema + // and table configs. + try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) { + recordReader.init(indexDir, null, null); + SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName, + getSchema(tableNameWithType)); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, recordReader); + driver.build(); + } + + File refreshedSegmentFile = new File(workingDir, segmentName); + SegmentConversionResult result = new SegmentConversionResult.Builder().setFile(refreshedSegmentFile) + .setTableNameWithType(tableNameWithType) + .setSegmentName(segmentName) + .build(); + + long endMillis = System.currentTimeMillis(); + LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, + (endMillis - _taskStartTime)); + + return result; + } + + private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig, + SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) { + // Inverted index creation is disabled by default during segment generation typically to reduce segment push times + // from external sources like HDFS. Also, not creating an inverted index here, the segment will always be flagged as + // needReload, causing the segment refresh to take place. + tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(workingDir.getPath()); + config.setSegmentName(segmentName); + + // Keep index creation time the same as original segment because both segments use the same raw data. + // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to + // identify if the new pushed segment has newer data than the existing one. + config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime())); + + // The time column type info is not stored in the segment metadata. + // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT). + if (segmentMetadata.getTimeInterval() != null) { + config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName()); + config.setStartTime(Long.toString(segmentMetadata.getStartTime())); + config.setEndTime(Long.toString(segmentMetadata.getEndTime())); + config.setSegmentTimeUnit(segmentMetadata.getTimeUnit()); + } + return config; + } + + private static void closeSegmentDirectoryQuietly(SegmentDirectory segmentDirectory) { + if (segmentDirectory != null) { + try { + segmentDirectory.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close SegmentDirectory due to error: {}", e.getMessage()); + } + } + } + + @Override + protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, + SegmentConversionResult segmentConversionResult) { + return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, + Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, + MinionTaskUtils.toUTCString(_taskStartTime))); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java new file mode 100644 index 0000000000..5214d46645 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.refreshsegment; + +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.MinionConf; +import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; +import org.apache.pinot.minion.executor.PinotTaskExecutor; +import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; +import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory; + + +@TaskExecutorFactory +public class RefreshSegmentTaskExecutorFactory implements PinotTaskExecutorFactory { + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager) { + } + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) { + } + + @Override + public String getTaskType() { + return MinionConstants.RefreshSegmentTask.TASK_TYPE; + } + + @Override + public PinotTaskExecutor create() { + return new RefreshSegmentTaskExecutor(); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java new file mode 100644 index 0000000000..59e85c1b1e --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.refreshsegment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.data.Segment; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.RefreshSegmentTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@TaskGenerator +public class RefreshSegmentTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class); + + @Override + public String getTaskType() { + return RefreshSegmentTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = RefreshSegmentTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + + int tableNumTasks = 0; + + for (TableConfig tableConfig : tableConfigs) { + String tableNameWithType = tableConfig.getTableName(); + LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType); + + // Get the task configs for the table. This is used to restrict the maximum number of allowed tasks per table at + // any given point. + Map<String, String> taskConfigs; + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + if (tableTaskConfig == null) { + LOGGER.warn("Failed to find task config for table: {}", tableNameWithType); + continue; + } + taskConfigs = tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE); + Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType); + int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; + String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); + if (tableMaxNumTasksConfig != null) { + try { + tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); + } catch (Exception e) { + tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; + LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType); + } + } + + // Get info about table and schema. + Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); + Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); + + // Get the running segments for a table. + Set<Segment> runningSegments = + TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor); + + // Make a single ZK call to get the segments. + List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + + for (SegmentZKMetadata segmentZKMetadata : allSegments) { + // Skip if we have reached the maximum number of permissible tasks per iteration. + if (tableNumTasks >= tableMaxNumTasks) { + break; + } + + // Skip consuming segments. + if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) { + continue; + } + + // Skip segments for which a task is already running. + if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) { + continue; + } + + String segmentName = segmentZKMetadata.getSegmentName(); + + // Skip if the segment is already up-to-date and doesn't have to be refreshed. + if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) { + continue; + } + + Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName))); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl()); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc())); + pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs)); + tableNumTasks++; + } + + LOGGER.info("Finished generating {} tasks configs for table: {} " + "for task: {}", tableNumTasks, + tableNameWithType, taskType); + } + + return pinotTaskConfigs; + } + + /** + * We need not refresh when: There were no tableConfig or schema updates after the last time the segment was + * refreshed by this task. + * + * Note that newly created segments after the latest tableConfig/schema update will still need to be refreshed. This + * is because inverted index created is disabled by default during segment generation. This can be added as an + * additional check in the future, if required. + */ + private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, Stat tableStat, + Stat schemaStat) { + String tableNameWithType = tableConfig.getTableName(); + String timestampKey = RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + + long lastProcessedTime = 0L; + if (segmentZKMetadata.getCustomMap() != null && segmentZKMetadata.getCustomMap().containsKey(timestampKey)) { + lastProcessedTime = MinionTaskUtils.fromUTCString(segmentZKMetadata.getCustomMap().get(timestampKey)); + } + + if (tableStat == null || schemaStat == null) { + LOGGER.warn("Table or schema stat is null for table: {}", tableNameWithType); + return false; + } + + long tableMTime = tableStat.getMtime(); + long schemaMTime = schemaStat.getMtime(); + +// TODO: See comment above - add this later if required. +// boolean segmentCreatedBeforeUpdate = +// tableMTime > segmentZKMetadata.getCreationTime() || schemaMTime > segmentZKMetadata.getCreationTime(); + + boolean segmentProcessedBeforeUpdate = tableMTime > lastProcessedTime || schemaMTime > lastProcessedTime; + return segmentProcessedBeforeUpdate; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java similarity index 59% copy from pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java copy to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java index ea944cc39c..b10db94901 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java @@ -16,30 +16,18 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.pinot.plugin.minion.tasks.refreshsegment; -package org.apache.pinot.spi.config.table; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory; +import org.apache.pinot.spi.annotations.minion.EventObserverFactory; -import com.fasterxml.jackson.annotation.JsonProperty; +@EventObserverFactory +public class RefreshSegmentTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { -/* - * Container object for metadata info / stats of Pinot tables - */ -public class TableStats { - public static final String CREATION_TIME_KEY = "creationTime"; - - private String _creationTime; - - public TableStats(String creationTime) { - _creationTime = creationTime; - } - - @JsonProperty(CREATION_TIME_KEY) - public String getCreationTime() { - return _creationTime; - } - - public void setCreationTime(String creationTime) { - _creationTime = creationTime; + @Override + public String getTaskType() { + return MinionConstants.RefreshSegmentTask.TASK_TYPE; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java similarity index 87% rename from pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java index ea944cc39c..1c133f2868 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStats.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatsHumanReadable.java @@ -23,14 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; /* - * Container object for metadata info / stats of Pinot tables + * Container object for human-readable metadata info / stats of Pinot tables */ -public class TableStats { +public class TableStatsHumanReadable { public static final String CREATION_TIME_KEY = "creationTime"; private String _creationTime; - public TableStats(String creationTime) { + public TableStatsHumanReadable(String creationTime) { _creationTime = creationTime; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org