This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e63ef628f3 Move the logic to modify default value for consuming segment inside RealtimeTableDataManager (#10086) e63ef628f3 is described below commit e63ef628f32578af3197392b7e94afe7d91f5983 Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Tue Jan 10 22:30:38 2023 +0530 Move the logic to modify default value for consuming segment inside RealtimeTableDataManager (#10086) Co-authored-by: Saurabh Dubey <saurabh.dubey@Saurabhs-MacBook-Pro.local> --- .../core/data/manager/BaseTableDataManager.java | 2 +- .../manager/realtime/RealtimeTableDataManager.java | 37 +++++++++++++ .../realtime/RealtimeTableDataManagerTest.java | 28 ++++++++++ .../starter/helix/HelixInstanceDataManager.java | 40 -------------- .../helix/HelixInstanceDataManagerTest.java | 63 ---------------------- 5 files changed, 66 insertions(+), 104 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index ac8a717670..ff5a7693f6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BaseTableDataManager implements TableDataManager { - private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>(); // Semaphore to restrict the maximum number of parallel segment downloads for a table. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index fb53de64ea..0527aac163 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.data.manager.realtime; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; @@ -37,6 +38,7 @@ import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; @@ -72,10 +74,13 @@ import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; +import org.apache.pinot.spi.utils.TimeUtils; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; @@ -390,6 +395,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { throw new RuntimeException("Mismatching schema/table config for " + _tableNameWithType); } VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, segmentName); + setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); if (!isHLCSegment) { // Generates only one semaphore for every partitionGroupId @@ -429,6 +435,37 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } + /** + * Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to + * manage the segments, so its values have to be within the valid range. + */ + @VisibleForTesting + static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema schema, SegmentZKMetadata zkMetadata) { + String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + if (StringUtils.isEmpty(timeColumnName)) { + return; + } + DateTimeFieldSpec timeColumnSpec = schema.getSpecForTimeColumn(timeColumnName); + Preconditions.checkState(timeColumnSpec != null, "Failed to find time field: %s from schema: %s", timeColumnName, + schema.getSchemaName()); + String defaultTimeString = timeColumnSpec.getDefaultNullValueString(); + DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec(); + try { + long defaultTimeMs = dateTimeFormatSpec.fromFormatToMillis(defaultTimeString); + if (TimeUtils.timeValueInValidRange(defaultTimeMs)) { + return; + } + } catch (Exception e) { + // Ignore + } + String creationTimeString = dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime()); + Object creationTime = timeColumnSpec.getDataType().convert(creationTimeString); + timeColumnSpec.setDefaultNullValue(creationTime); + LOGGER.info( + "Default time: {} does not comply with format: {}, using creation time: {} as the default time for table: {}", + defaultTimeString, timeColumnSpec.getFormat(), creationTime, tableConfig.getTableName()); + } + @Override public void addSegment(ImmutableSegment immutableSegment) { if (isUpsertEnabled()) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java index dbb4d0a86d..392e562fac 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java @@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -52,6 +53,8 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -61,6 +64,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -221,4 +225,28 @@ public class RealtimeTableDataManagerTest { AccessOption.PERSISTENT)).thenReturn(schemaZNRecord); return schema; } + + @Test + public void testSetDefaultTimeValueIfInvalid() { + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + long currentTimeMs = System.currentTimeMillis(); + when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs); + + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build(); + Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable") + .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS").build(); + RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); + DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); + assertNotNull(timeFieldSpec); + assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs); + + schema = new Schema.SchemaBuilder().setSchemaName("testTable") + .addDateTime("timeColumn", FieldSpec.DataType.INT, "SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build(); + RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); + timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); + assertNotNull(timeFieldSpec); + assertEquals(timeFieldSpec.getDefaultNullValue(), + Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs))); + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 25abdfc9d7..a52d627aac 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.server.starter.helix; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -40,7 +39,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; @@ -67,11 +65,8 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,43 +185,11 @@ public class HelixInstanceDataManager implements InstanceDataManager { ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName); Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, realtimeTableName); - setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata); _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig)) .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata); LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName); } - /** - * Sets the default time value in the schema as the segment creation time if it is invalid. Time column is used to - * manage the segments, so its values have to be within the valid range. - */ - @VisibleForTesting - static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema schema, SegmentZKMetadata zkMetadata) { - String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); - if (StringUtils.isEmpty(timeColumnName)) { - return; - } - DateTimeFieldSpec timeColumnSpec = schema.getSpecForTimeColumn(timeColumnName); - Preconditions.checkState(timeColumnSpec != null, "Failed to find time field: %s from schema: %s", timeColumnName, - schema.getSchemaName()); - String defaultTimeString = timeColumnSpec.getDefaultNullValueString(); - DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec(); - try { - long defaultTimeMs = dateTimeFormatSpec.fromFormatToMillis(defaultTimeString); - if (TimeUtils.timeValueInValidRange(defaultTimeMs)) { - return; - } - } catch (Exception e) { - // Ignore - } - String creationTimeString = dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime()); - Object creationTime = timeColumnSpec.getDataType().convert(creationTimeString); - timeColumnSpec.setDefaultNullValue(creationTime); - LOGGER.info( - "Default time: {} does not comply with format: {}, using creation time: {} as the default time for table: {}", - defaultTimeString, timeColumnSpec.getFormat(), creationTime, tableConfig.getTableName()); - } - private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) { LOGGER.info("Creating table data manager for table: {}", tableNameWithType); TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig); @@ -476,9 +439,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName, tableNameWithType); - if (schema != null) { - setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata); - } // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java deleted file mode 100644 index f6351c505e..0000000000 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.server.starter.helix; - -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - - -public class HelixInstanceDataManagerTest { - - @Test - public void testSetDefaultTimeValueIfInvalid() { - SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); - long currentTimeMs = System.currentTimeMillis(); - when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs); - - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build(); - Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable") - .addDateTime("timeColumn", DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS").build(); - HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); - DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); - assertNotNull(timeFieldSpec); - assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs); - - schema = new Schema.SchemaBuilder().setSchemaName("testTable") - .addDateTime("timeColumn", DataType.INT, "SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build(); - HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata); - timeFieldSpec = schema.getSpecForTimeColumn("timeColumn"); - assertNotNull(timeFieldSpec); - assertEquals(timeFieldSpec.getDefaultNullValue(), - Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs))); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org