This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e63ef628f3 Move the logic to modify default value for consuming 
segment inside RealtimeTableDataManager (#10086)
e63ef628f3 is described below

commit e63ef628f32578af3197392b7e94afe7d91f5983
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Tue Jan 10 22:30:38 2023 +0530

    Move the logic to modify default value for consuming segment inside 
RealtimeTableDataManager (#10086)
    
    Co-authored-by: Saurabh Dubey <saurabh.dubey@Saurabhs-MacBook-Pro.local>
---
 .../core/data/manager/BaseTableDataManager.java    |  2 +-
 .../manager/realtime/RealtimeTableDataManager.java | 37 +++++++++++++
 .../realtime/RealtimeTableDataManagerTest.java     | 28 ++++++++++
 .../starter/helix/HelixInstanceDataManager.java    | 40 --------------
 .../helix/HelixInstanceDataManagerTest.java        | 63 ----------------------
 5 files changed, 66 insertions(+), 104 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index ac8a717670..ff5a7693f6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -81,7 +81,7 @@ import org.slf4j.LoggerFactory;
 
 @ThreadSafe
 public abstract class BaseTableDataManager implements TableDataManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableDataManager.class);
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableDataManager.class);
 
   protected final ConcurrentHashMap<String, SegmentDataManager> 
_segmentDataManagerMap = new ConcurrentHashMap<>();
   // Semaphore to restrict the maximum number of parallel segment downloads 
for a table.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index fb53de64ea..0527aac163 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +38,7 @@ import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -72,10 +74,13 @@ import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.spi.utils.TimeUtils;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
 
@@ -390,6 +395,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       throw new RuntimeException("Mismatching schema/table config for " + 
_tableNameWithType);
     }
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
+    setDefaultTimeValueIfInvalid(tableConfig, schema, segmentZKMetadata);
 
     if (!isHLCSegment) {
       // Generates only one semaphore for every partitionGroupId
@@ -429,6 +435,37 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
   }
 
+  /**
+   * Sets the default time value in the schema as the segment creation time if 
it is invalid. Time column is used to
+   * manage the segments, so its values have to be within the valid range.
+   */
+  @VisibleForTesting
+  static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema 
schema, SegmentZKMetadata zkMetadata) {
+    String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+    if (StringUtils.isEmpty(timeColumnName)) {
+      return;
+    }
+    DateTimeFieldSpec timeColumnSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+    Preconditions.checkState(timeColumnSpec != null, "Failed to find time 
field: %s from schema: %s", timeColumnName,
+        schema.getSchemaName());
+    String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
+    DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
+    try {
+      long defaultTimeMs = 
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
+      if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
+        return;
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    String creationTimeString = 
dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime());
+    Object creationTime = 
timeColumnSpec.getDataType().convert(creationTimeString);
+    timeColumnSpec.setDefaultNullValue(creationTime);
+    LOGGER.info(
+        "Default time: {} does not comply with format: {}, using creation 
time: {} as the default time for table: {}",
+        defaultTimeString, timeColumnSpec.getFormat(), creationTime, 
tableConfig.getTableName());
+  }
+
   @Override
   public void addSegment(ImmutableSegment immutableSegment) {
     if (isUpsertEnabled()) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index dbb4d0a86d..392e562fac 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -52,6 +53,8 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -61,6 +64,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -221,4 +225,28 @@ public class RealtimeTableDataManagerTest {
         AccessOption.PERSISTENT)).thenReturn(schemaZNRecord);
     return schema;
   }
+
+  @Test
+  public void testSetDefaultTimeValueIfInvalid() {
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    long currentTimeMs = System.currentTimeMillis();
+    when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs);
+
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+        .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", 
"1:MILLISECONDS").build();
+    RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
+    DateTimeFieldSpec timeFieldSpec = 
schema.getSpecForTimeColumn("timeColumn");
+    assertNotNull(timeFieldSpec);
+    assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs);
+
+    schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+        .addDateTime("timeColumn", FieldSpec.DataType.INT, 
"SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build();
+    RealtimeTableDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
+    timeFieldSpec = schema.getSpecForTimeColumn("timeColumn");
+    assertNotNull(timeFieldSpec);
+    assertEquals(timeFieldSpec.getDefaultNullValue(),
+        
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 25abdfc9d7..a52d627aac 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.server.starter.helix;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
@@ -67,11 +65,8 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -190,43 +185,11 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
     _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(k, tableConfig))
         .addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), 
zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
-  /**
-   * Sets the default time value in the schema as the segment creation time if 
it is invalid. Time column is used to
-   * manage the segments, so its values have to be within the valid range.
-   */
-  @VisibleForTesting
-  static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema 
schema, SegmentZKMetadata zkMetadata) {
-    String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
-    if (StringUtils.isEmpty(timeColumnName)) {
-      return;
-    }
-    DateTimeFieldSpec timeColumnSpec = 
schema.getSpecForTimeColumn(timeColumnName);
-    Preconditions.checkState(timeColumnSpec != null, "Failed to find time 
field: %s from schema: %s", timeColumnName,
-        schema.getSchemaName());
-    String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
-    DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
-    try {
-      long defaultTimeMs = 
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
-      if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
-        return;
-      }
-    } catch (Exception e) {
-      // Ignore
-    }
-    String creationTimeString = 
dateTimeFormatSpec.fromMillisToFormat(zkMetadata.getCreationTime());
-    Object creationTime = 
timeColumnSpec.getDataType().convert(creationTimeString);
-    timeColumnSpec.setDefaultNullValue(creationTime);
-    LOGGER.info(
-        "Default time: {} does not comply with format: {}, using creation 
time: {} as the default time for table: {}",
-        defaultTimeString, timeColumnSpec.getFormat(), creationTime, 
tableConfig.getTableName());
-  }
-
   private TableDataManager createTableDataManager(String tableNameWithType, 
TableConfig tableConfig) {
     LOGGER.info("Creating table data manager for table: {}", 
tableNameWithType);
     TableDataManagerConfig tableDataManagerConfig = new 
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
@@ -476,9 +439,6 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         tableNameWithType);
-    if (schema != null) {
-      setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
-    }
 
     // This method might modify the file on disk. Use segment lock to prevent 
race condition
     Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
deleted file mode 100644
index f6351c505e..0000000000
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-
-public class HelixInstanceDataManagerTest {
-
-  @Test
-  public void testSetDefaultTimeValueIfInvalid() {
-    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
-    long currentTimeMs = System.currentTimeMillis();
-    when(segmentZKMetadata.getCreationTime()).thenReturn(currentTimeMs);
-
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("timeColumn").build();
-    Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
-        .addDateTime("timeColumn", DataType.TIMESTAMP, "TIMESTAMP", 
"1:MILLISECONDS").build();
-    HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
-    DateTimeFieldSpec timeFieldSpec = 
schema.getSpecForTimeColumn("timeColumn");
-    assertNotNull(timeFieldSpec);
-    assertEquals(timeFieldSpec.getDefaultNullValue(), currentTimeMs);
-
-    schema = new Schema.SchemaBuilder().setSchemaName("testTable")
-        .addDateTime("timeColumn", DataType.INT, 
"SIMPLE_DATE_FORMAT|yyyyMMdd", "1:DAYS").build();
-    HelixInstanceDataManager.setDefaultTimeValueIfInvalid(tableConfig, schema, 
segmentZKMetadata);
-    timeFieldSpec = schema.getSpecForTimeColumn("timeColumn");
-    assertNotNull(timeFieldSpec);
-    assertEquals(timeFieldSpec.getDefaultNullValue(),
-        
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to