mayankshriv commented on a change in pull request #6094:
URL: https://github.com/apache/incubator-pinot/pull/6094#discussion_r500569900



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -148,6 +153,23 @@ public static URI 
getRetrieveAllSegmentWithTableTypeHttpUri(String host, int por
         rawTableName + TYPE_DELIMITER + tableType));
   }
 
+  public static URI getStartReplaceSegmentsHttpURI(URI controllerURI, String 
rawTableName, String tableType)

Review comment:
       Could you please add java-docs on public methods? Also, will this 
support both http as well as https?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import 
org.apache.pinot.controller.helix.core.minion.mergestrategy.MergeStrategyFactory;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMergeRollupTaskGenerator.class);
+
+  private static final int DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 20; // 20 
segments
+  private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000; // 
5 million rows
+  private static final String DEFAULT_BUFFER_TIME_PERIOD = "14d"; // 2 weeks
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public SegmentMergeRollupTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.MergeRollupTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    // Get the segments that are being converted so that we don't submit them 
again
+    Map<String, List<String>> scheduledSegmentsMap =
+        
TaskGeneratorUtils.getScheduledSegmentsMap(MinionConstants.MergeRollupTask.TASK_TYPE,
 _clusterInfoProvider);
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != TableType.OFFLINE) {
+        LOGGER.warn("Skip generating MergeRollupTask for non-OFFLINE table: 
{}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.MergeRollupTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: {}", offlineTableName);
+
+      int tableMaxNumTasks =
+          readIntConfigWithDefaultValue(taskConfigs, 
MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
+              Integer.MAX_VALUE);
+
+      int maxNumSegmentsPerTask =
+          readIntConfigWithDefaultValue(taskConfigs, 
MinionConstants.MergeRollupTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
+              DEFAULT_MAX_NUM_SEGMENTS_PER_TASK);
+
+      int maxNumRecordsPerSegment =
+          readIntConfigWithDefaultValue(taskConfigs, 
MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+              DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
+
+      String bufferTimePeriodStr =
+          
taskConfigs.getOrDefault(MinionConstants.MergeRollupTask.BUFFER_TIME_PERIOD_KEY,
 DEFAULT_BUFFER_TIME_PERIOD);
+      long bufferTimePeriodInMillis;
+      try {
+        bufferTimePeriodInMillis = 
TimeUtils.convertPeriodToMillis(bufferTimePeriodStr);
+      } catch (IllegalArgumentException e) {
+        LOGGER.warn(
+            "Buffer time period ('{}') for table '{}' is not configured 
correctly. Falling back to default behavior ('{}')",
+            bufferTimePeriodStr, offlineTableName, DEFAULT_BUFFER_TIME_PERIOD);
+        bufferTimePeriodInMillis = 
TimeUtils.convertPeriodToMillis(DEFAULT_BUFFER_TIME_PERIOD);
+      }
+
+      // Generate tasks
+      int tableNumTasks = 0;
+
+      List<OfflineSegmentZKMetadata> segmentsForOfflineTable =
+          _clusterInfoProvider.getOfflineSegmentsMetadata(offlineTableName);
+
+      // Fetch the segment lineage for the table and compute the segments that 
should not be scheduled for merge
+      // based on the segment lineage.
+      SegmentLineage segmentLineageForTable = 
_clusterInfoProvider.getSegmentLineage(offlineTableName);
+      Set<String> segmentsNotToMerge = new HashSet<>();
+      if (segmentLineageForTable != null) {
+        for (String segmentLineageEntryId : 
segmentLineageForTable.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineageForTable.getLineageEntry(segmentLineageEntryId);
+          // Segments shows up on "segmentFrom" field in the lineage entry 
should not be scheduled again.
+          segmentsNotToMerge.addAll(lineageEntry.getSegmentsFrom());
+
+          // Segments shows up on "segmentsTo" field in the lineage entry with 
"IN_PROGRESS" state cannot be merged.
+          if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            segmentsNotToMerge.addAll(lineageEntry.getSegmentsTo());
+          }
+        }
+      }
+
+      // Filter out the segments that cannot be merged
+      List<SegmentZKMetadata> segmentsToMergeForTable = new ArrayList<>();
+      List<String> scheduledSegments = 
scheduledSegmentsMap.getOrDefault(offlineTableName, Collections.emptyList());
+      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : 
segmentsForOfflineTable) {
+        String segmentName = offlineSegmentZKMetadata.getSegmentName();
+
+        // The segment should not be merged if it's already scheduled or in 
progress
+        if (scheduledSegments.contains(segmentName) || 
segmentsNotToMerge.contains(segmentName)) {
+          continue;
+        }
+
+        // The segment should not be merged if it already contains enough 
records

Review comment:
       The intention for this class is to support both merge and rollup, right? 
If so, does this (and other coniditions) apply to both merge and rollup? If 
not, one way would be to factor it out such that you have generator independent 
of selector of segments, etc.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
##########
@@ -31,13 +31,28 @@
 public class SegmentConfig {
 
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+  private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE = "simple";
+
   private final int _maxNumRecordsPerSegment;
-  // TODO: more configs such as segment name prefix
+
+  // Currently, 'simple', 'normalizedDate' are supported
+  private final String _segmentNameGeneratorType;
+  private final String _segmentPrefix;
+  private final String _segmentPostfix;
+  private final boolean _excludeSequenceId;
 
   @JsonCreator
-  private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment") int 
maxNumRecordsPerSegment) {
+  private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment") int 
maxNumRecordsPerSegment,

Review comment:
       Will there be backward compatibility issue if this class is ser/de? I 
recommend to use the "ignoreUnknown"  annotation so that new code can still 
de-serialize old json.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
##########
@@ -68,4 +73,30 @@
     }
     return runningSegments;
   }
+
+  public static Map<String, List<String>> getScheduledSegmentsMap(@Nonnull 
String taskType,

Review comment:
       Can this return segments that are no longer scheduled? For example, if 
the task completed before the caller got the result from this method? If so, 
are there any side-effects?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import 
org.apache.pinot.controller.helix.core.minion.mergestrategy.MergeStrategyFactory;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMergeRollupTaskGenerator.class);
+
+  private static final int DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 20; // 20 
segments
+  private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000; // 
5 million rows
+  private static final String DEFAULT_BUFFER_TIME_PERIOD = "14d"; // 2 weeks
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public SegmentMergeRollupTaskGenerator(ClusterInfoProvider 
clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.MergeRollupTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    // Get the segments that are being converted so that we don't submit them 
again
+    Map<String, List<String>> scheduledSegmentsMap =
+        
TaskGeneratorUtils.getScheduledSegmentsMap(MinionConstants.MergeRollupTask.TASK_TYPE,
 _clusterInfoProvider);
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != TableType.OFFLINE) {
+        LOGGER.warn("Skip generating MergeRollupTask for non-OFFLINE table: 
{}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          
tableTaskConfig.getConfigsForTaskType(MinionConstants.MergeRollupTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: {}", offlineTableName);
+
+      int tableMaxNumTasks =
+          readIntConfigWithDefaultValue(taskConfigs, 
MinionConstants.TABLE_MAX_NUM_TASKS_KEY,

Review comment:
       Should the default be Integer.MAX_VALUE?

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentMergeRollupMinionClusterIntegrationTest.java
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeRollupMinionClusterIntegrationTest extends 
HybridClusterIntegrationTest {

Review comment:
       Can this be rolled under Minion integration test?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
##########
@@ -187,6 +203,38 @@ public void processSegments()
         Arrays.toString(_outputSegmentsDir.list()));
   }
 
+  private SegmentNameGenerator getSegmentNameGenerator(TableConfig 
tableConfig, Schema schema,
+      SegmentConfig segmentConfig) {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
+    String segmentNameGeneratorType = 
segmentConfig.getSegmentNameGeneratorType();
+    if (segmentNameGeneratorType == null) {
+      segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
+    }
+
+    switch (segmentNameGeneratorType) {
+      case SIMPLE_SEGMENT_NAME_GENERATOR:
+        return new SimpleSegmentNameGenerator(rawTableName, 
segmentConfig.getSegmentPostfix());
+      case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config 
must be provided");

Review comment:
       Nit: Can be rephrased as "Table config required to use NormalizedDate..."

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -66,42 +66,58 @@ public static Schema 
convertPinotSchemaToAvroSchema(org.apache.pinot.spi.data.Sc
       if (fieldSpec.isSingleValueField()) {
         switch (dataType) {
           case INT:
-            fieldAssembler = 
fieldAssembler.name(name).type().intType().noDefault();
+            fieldAssembler =
+                
fieldAssembler.name(name).type().unionOf().intType().and().nullType().endUnion().noDefault();

Review comment:
       What's the purpose of making it union? Also, please add that into the PR 
description.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to