This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5cd0baf5 Use the same default time value for all replicas (#10029)
ac5cd0baf5 is described below

commit ac5cd0baf54436580deb6a6d57f0bf7e6adc9f03
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sat Dec 24 11:44:31 2022 -0800

    Use the same default time value for all replicas (#10029)
---
 .../core/data/manager/BaseTableDataManager.java    |  2 +-
 .../manager/realtime/RealtimeTableDataManager.java | 11 ++--
 .../realtime/RealtimeTableDataManagerTest.java     |  8 +--
 .../local/data/manager/TableDataManager.java       |  3 +-
 .../starter/helix/HelixInstanceDataManager.java    | 46 +++++++++++++++-
 .../helix/HelixInstanceDataManagerTest.java        | 63 ++++++++++++++++++++++
 6 files changed, 116 insertions(+), 17 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1111a2d48b..ac8a717670 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -223,7 +223,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   @Override
-  public void addSegment(String segmentName, TableConfig tableConfig, 
IndexLoadingConfig indexLoadingConfig)
+  public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata)
       throws Exception {
     throw new UnsupportedOperationException();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ee29271b15..89b45f1cce 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -273,7 +273,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    *   to start consuming or download the segment.
    */
   @Override
-  public void addSegment(String segmentName, TableConfig tableConfig, 
IndexLoadingConfig indexLoadingConfig)
+  public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
       throws Exception {
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager != null) {
@@ -282,18 +282,15 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       return;
     }
 
-    SegmentZKMetadata segmentZKMetadata =
-        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
_tableNameWithType, segmentName);
-    Preconditions.checkNotNull(segmentZKMetadata);
-    Schema schema = indexLoadingConfig.getSchema();
-    Preconditions.checkNotNull(schema);
-
     File segmentDir = new File(_indexDir, segmentName);
     // Restart during segment reload might leave segment in inconsistent state 
(index directory might not exist but
     // segment backup directory existed), need to first try to recover from 
reload failure before checking the existence
     // of the index directory and loading segment from it
     LoaderUtils.reloadFailureRecovery(segmentDir);
 
+    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+    Schema schema = indexLoadingConfig.getSchema();
+    assert schema != null;
     boolean isHLCSegment = 
SegmentName.isHighLevelConsumerSegmentName(segmentName);
     if (segmentZKMetadata.getStatus().isCompleted()) {
       if (isHLCSegment && !segmentDir.exists()) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index f9eeb6a536..dbb4d0a86d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -106,8 +106,6 @@ public class RealtimeTableDataManagerTest {
     File localSegDir = createSegment(tableConfig, schema, segName);
     long segCrc = TableDataManagerTestUtils.getCRC(localSegDir, 
SegmentVersion.v3);
     segmentZKMetadata.setCrc(segCrc);
-    
when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE,
 segName), null,
-        AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord());
 
     // Move the segment to the backup location.
     File backup = new File(TABLE_DATA_DIR, segName + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
@@ -116,7 +114,7 @@ public class RealtimeTableDataManagerTest {
     assertFalse(localSegDir.exists());
     IndexLoadingConfig indexLoadingConfig =
         TableDataManagerTestUtils.createIndexLoadingConfig("default", 
tableConfig, schema);
-    tmgr.addSegment(segName, tableConfig, indexLoadingConfig);
+    tmgr.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
     // Segment data is put back the default location, and backup location is 
deleted.
     assertTrue(localSegDir.exists());
     assertFalse(backup.exists());
@@ -142,15 +140,13 @@ public class RealtimeTableDataManagerTest {
         TableDataManagerTestUtils.makeRawSegment(segName, 
createSegment(tableConfig, schema, segName),
             new File(TEMP_DIR, segName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
     segmentZKMetadata.setStatus(Status.DONE);
-    
when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE,
 segName), null,
-        AccessOption.PERSISTENT)).thenReturn(segmentZKMetadata.toZNRecord());
 
     // Local segment dir doesn't exist, thus downloading from deep store.
     File localSegDir = new File(TABLE_DATA_DIR, segName);
     assertFalse(localSegDir.exists());
     IndexLoadingConfig indexLoadingConfig =
         TableDataManagerTestUtils.createIndexLoadingConfig("default", 
tableConfig, schema);
-    tmgr.addSegment(segName, tableConfig, indexLoadingConfig);
+    tmgr.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
     // Segment data is put on default location.
     assertTrue(localSegDir.exists());
     SegmentMetadataImpl llmd = new SegmentMetadataImpl(new 
File(TABLE_DATA_DIR, segName));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index e33cf6a743..a316669277 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -34,7 +34,6 @@ import 
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -78,7 +77,7 @@ public interface TableDataManager {
    * Adds a segment into the REALTIME table.
    * <p>The segment could be committed or under consuming.
    */
-  void addSegment(String segmentName, TableConfig tableConfig, 
IndexLoadingConfig indexLoadingConfig)
+  void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, 
SegmentZKMetadata zkMetadata)
       throws Exception;
 
   /**
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index fe938648b0..a254009742 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
@@ -64,8 +66,11 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -174,11 +179,47 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", realtimeTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", realtimeTableName);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
+    Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
+        realtimeTableName);
+    setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
     _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(k, tableConfig))
-        .addSegment(segmentName, tableConfig, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema));
+        .addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), 
zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
+  /**
+   * Sets the default time value in the schema as the segment creation time if 
it is invalid. Time column is used to
+   * manage the segments, so its values have to be within the valid range.
+   */
+  @VisibleForTesting
+  static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema 
schema, SegmentZKMetadata zkMetadata) {
+    String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+    if (StringUtils.isEmpty(timeColumnName)) {
+      return;
+    }
+    DateTimeFieldSpec timeColumnSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+    Preconditions.checkState(timeColumnSpec != null, "Failed to find time 
field: %s from schema: %s", timeColumnName,
+        schema.getSchemaName());
+    String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
+    DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
+    try {
+      long defaultTimeMs = 
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
+      if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
+        return;
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    String creationTimeString = 
dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime());
+    Object creationTime = 
timeColumnSpec.getDataType().convert(creationTimeString);
+    timeColumnSpec.setDefaultNullValue(creationTime);
+    LOGGER.info(
+        "Default time: {} does not comply with format: {}, using creation 
time: {} as the default time for table: {}",
+        defaultTimeString, timeColumnSpec.getFormat(), creationTime, 
tableConfig.getTableName());
+  }
+
   private TableDataManager createTableDataManager(String tableNameWithType, 
TableConfig tableConfig) {
     LOGGER.info("Creating table data manager for table: {}", 
tableNameWithType);
     TableDataManagerConfig tableDataManagerConfig = new 
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
@@ -428,6 +469,9 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         tableNameWithType);
+    if (schema != null) {
+      setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
+    }
 
     // This method might modify the file on disk. Use segment lock to prevent 
race condition
     Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
new file mode 100644
index 0000000000..f6351c505e
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class HelixInstanceDataManagerTest {
+
+  @Test
+  public void testSetDefaultTimeValueIfInvalid() {
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    long currentTimeMs = System.currentTimeMillis();
+    when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs);
+
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+        .addDateTime("timeColumn", DataType.TIMESTAMP, "TIMESTAMP", 
"1:MILLISECONDS").build();
+    HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
+    DateTimeFieldSpec timeFieldSpec = 
schema.getSpecForTimeColumn("timeColumn");
+    assertNotNull(timeFieldSpec);
+    assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs);
+
+    schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+        .addDateTime("timeColumn", DataType.INT, 
"SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build();
+    HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
+    timeFieldSpec = schema.getSpecForTimeColumn("timeColumn");
+    assertNotNull(timeFieldSpec);
+    assertEquals(timeFieldSpec.getDefaultNullValue(),
+        
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to