This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ac5cd0baf5 Use the same default time value for all replicas (#10029) ac5cd0baf5 is described below commit ac5cd0baf54436580deb6a6d57f0bf7e6adc9f03 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Sat Dec 24 11:44:31 2022 -0800 Use the same default time value for all replicas (#10029) --- .../core/data/manager/BaseTableDataManager.java | 2 +- .../manager/realtime/RealtimeTableDataManager.java | 11 ++-- .../realtime/RealtimeTableDataManagerTest.java | 8 +-- .../local/data/manager/TableDataManager.java | 3 +- .../starter/helix/HelixInstanceDataManager.java | 46 +++++++++++++++- .../helix/HelixInstanceDataManagerTest.java | 63 ++++++++++++++++++++++ 6 files changed, 116 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 1111a2d48b..ac8a717670 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -223,7 +223,7 @@ public abstract class BaseTableDataManager implements TableDataManager { } @Override - public void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) + public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) throws Exception { throw new UnsupportedOperationException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index ee29271b15..89b45f1cce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -273,7 +273,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { * to start consuming or download the segment. */ @Override - public void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) + public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) throws Exception { SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null) { @@ -282,18 +282,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return; } - SegmentZKMetadata segmentZKMetadata = - ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentName); - Preconditions.checkNotNull(segmentZKMetadata); - Schema schema = indexLoadingConfig.getSchema(); - Preconditions.checkNotNull(schema); - File segmentDir = new File(_indexDir, segmentName); // Restart during segment reload might leave segment in inconsistent state (index directory might not exist but // segment backup directory existed), need to first try to recover from reload failure before checking the existence // of the index directory and loading segment from it LoaderUtils.reloadFailureRecovery(segmentDir); + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + Schema schema = indexLoadingConfig.getSchema(); + assert schema != null; boolean isHLCSegment = SegmentName.isHighLevelConsumerSegmentName(segmentName); if (segmentZKMetadata.getStatus().isCompleted()) { if (isHLCSegment && !segmentDir.exists()) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java index f9eeb6a536..dbb4d0a86d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java @@ -106,8 +106,6 @@ public class RealtimeTableDataManagerTest { File localSegDir = createSegment(tableConfig, schema, segName); long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, SegmentVersion.v3); segmentZKMetadata.setCrc(segCrc); - when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segName), null, - AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord()); // Move the segment to the backup location. File backup = new File(TABLE_DATA_DIR, segName + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX); @@ -116,7 +114,7 @@ public class RealtimeTableDataManagerTest { assertFalse(localSegDir.exists()); IndexLoadingConfig indexLoadingConfig = TableDataManagerTestUtils.createIndexLoadingConfig("default", tableConfig, schema); - tmgr.addSegment(segName, tableConfig, indexLoadingConfig); + tmgr.addSegment(segName, indexLoadingConfig, segmentZKMetadata); // Segment data is put back the default location, and backup location is deleted. assertTrue(localSegDir.exists()); assertFalse(backup.exists()); @@ -142,15 +140,13 @@ public class RealtimeTableDataManagerTest { TableDataManagerTestUtils.makeRawSegment(segName, createSegment(tableConfig, schema, segName), new File(TEMP_DIR, segName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true); segmentZKMetadata.setStatus(Status.DONE); - when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segName), null, - AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord()); // Local segment dir doesn't exist, thus downloading from deep store. File localSegDir = new File(TABLE_DATA_DIR, segName); assertFalse(localSegDir.exists()); IndexLoadingConfig indexLoadingConfig = TableDataManagerTestUtils.createIndexLoadingConfig("default", tableConfig, schema); - tmgr.addSegment(segName, tableConfig, indexLoadingConfig); + tmgr.addSegment(segName, indexLoadingConfig, segmentZKMetadata); // Segment data is put on default location. assertTrue(localSegDir.exists()); SegmentMetadataImpl llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index e33cf6a743..a316669277 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -34,7 +34,6 @@ import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -78,7 +77,7 @@ public interface TableDataManager { * Adds a segment into the REALTIME table. * <p>The segment could be committed or under consuming. */ - void addSegment(String segmentName, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) + void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) throws Exception; /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index fe938648b0..a254009742 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.starter.helix; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -38,6 +39,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; @@ -64,8 +66,11 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,11 +179,47 @@ public class HelixInstanceDataManager implements InstanceDataManager { Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", realtimeTableName); + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName); + Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, + realtimeTableName); + setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata); _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig)) - .addSegment(segmentName, tableConfig, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema)); + .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata); LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } + /** + * Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to + * manage the segments, so its values have to be within the valid range. + */ + @VisibleForTesting + static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema schema, SegmentZKMetadata zkMetadata) { + String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + if (StringUtils.isEmpty(timeColumnName)) { + return; + } + DateTimeFieldSpec timeColumnSpec = schema.getSpecForTimeColumn(timeColumnName); + Preconditions.checkState(timeColumnSpec != null, "Failed to find time field: %s from schema: %s", timeColumnName, + schema.getSchemaName()); + String defaultTimeString = timeColumnSpec.getDefaultNullValueString(); + DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec(); + try { + long defaultTimeMs = dateTimeFormatSpec.fromFormatToMillis(defaultTimeString); + if (TimeUtils.timeValueInValidRange(defaultTimeMs)) { + return; + } + } catch (Exception e) { + // Ignore + } + String creationTimeString = dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime()); + Object creationTime = timeColumnSpec.getDataType().convert(creationTimeString); + timeColumnSpec.setDefaultNullValue(creationTime); + LOGGER.info( + "Default time: {} does not comply with format: {}, using creation time: {} as the default time for table: {}", + defaultTimeString, timeColumnSpec.getFormat(), creationTime, tableConfig.getTableName()); + } + private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) { LOGGER.info("Creating table data manager for table: {}", tableNameWithType); TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig); @@ -428,6 +469,9 @@ public class HelixInstanceDataManager implements InstanceDataManager { ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, tableNameWithType); + if (schema != null) { + setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata); + } // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java new file mode 100644 index 0000000000..f6351c505e --- /dev/null +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +public class HelixInstanceDataManagerTest { + + @Test + public void testSetDefaultTimeValueIfInvalid() { + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + long currentTimeMs = System.currentTimeMillis(); + when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build(); + Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable") + .addDateTime("timeColumn", DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS").build(); + HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); + DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); + assertNotNull(timeFieldSpec); + assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs); + + schema = new Schema.SchemaBuilder().setSchemaName("testTable") + .addDateTime("timeColumn", DataType.INT, "SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build(); + HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); + timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); + assertNotNull(timeFieldSpec); + assertEquals(timeFieldSpec.getDefaultNullValue(), + Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org