Jackie-Jiang commented on a change in pull request #5793:
URL: https://github.com/apache/incubator-pinot/pull/5793#discussion_r466068162



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions 
instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && 
!tierInstancePartitionsMap.isEmpty()) {

Review comment:
       Use `InstanceAssignmentConfigUtils.shouldRelocateToTiers(tableConfig)` 
instead of checking map to determine whether to relocate tiers

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
##########
@@ -50,6 +50,13 @@ public static boolean 
shouldRelocateCompletedSegments(TableConfig tableConfig) {
         .isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
+  /**
+   * Returns whether relocation of segments to tiers has been enabled for this 
table
+   */
+  public static boolean shouldRelocateToTiers(TableConfig tableConfig) {
+    return tableConfig.getTierConfigsList() != null && 
!tableConfig.getTierConfigsList().isEmpty();

Review comment:
       (nit)
   ```suggestion
       return CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList());
   ```

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
##########
@@ -50,6 +50,13 @@ public static boolean 
shouldRelocateCompletedSegments(TableConfig tableConfig) {
         .isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
+  /**
+   * Returns whether relocation of segments to tiers has been enabled for this 
table
+   */
+  public static boolean shouldRelocateToTiers(TableConfig tableConfig) {

Review comment:
       Consider moving this into a separate class `TierConfigUtils`?

##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TierConfig.java
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * Config for the tiered storage and the segments which will move to that tier
+ */
+public class TierConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Name of the tier with format TIER<number>")
+  private final String _name;
+
+  @JsonPropertyDescription("The strategy for selecting segments")
+  private final String _segmentSelectorType;
+
+  @JsonPropertyDescription("For 'timeBased' segment selector, the period after 
which to select segments for this tier")
+  private final String _segmentAge;
+
+  @JsonPropertyDescription("The type of storage storage")
+  private final String _storageType;
+
+  @JsonPropertyDescription("For 'pinotServer' storageSelector, the tag with 
which to identify servers for this tier.")
+  private final String _serverTag;
+
+  // TODO: only "serverTag" is supported currently. In next iteration, 
"InstanceAssignmentConfig _instanceAssignmentConfig" will be added here
+
+  public TierConfig(@JsonProperty(value = "name", required = true) String name,
+      @JsonProperty(value = "segmentSelectorType", required = true) String 
segmentSelectorType,
+      @JsonProperty("segmentAge") @Nullable String segmentAge,
+      @JsonProperty(value = "storageType", required = true) String storageType,
+      @JsonProperty("serverTag") @Nullable String serverTag) {
+    _name = name;

Review comment:
       `Preconditions.checkArgument()` on all non-null arguments to prevent bad 
config

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -136,9 +143,55 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
         }
         // TODO: remove this once we add support for derived columns/chained 
transform functions
         if (!Collections.disjoint(transformColumns, argumentColumns)) {
-          throw new IllegalStateException("Derived columns not supported yet. 
Cannot use a transform column as argument to another transform functions");
+          throw new IllegalStateException(
+              "Derived columns not supported yet. Cannot use a transform 
column as argument to another transform functions");
         }
       }
     }
   }
+
+  /**
+   * Validates the tier configs
+   * Checks for the right segmentSelectorType and its required properties
+   * Checks for the right storageType and its required properties
+   */
+  private static void validateTierConfigList(@Nullable List<TierConfig> 
tierConfigList) {
+    if (tierConfigList == null) {
+      return;
+    }
+
+    Set<String> tierNames = new HashSet<>();
+    for (TierConfig tierConfig : tierConfigList) {
+      String tierName = tierConfig.getName();
+      Preconditions.checkState(!tierName.isEmpty());
+      Preconditions.checkState(!tierNames.contains(tierName), "Tier name: %s 
already exists in tier configs", tierName);
+      tierNames.add(tierName);

Review comment:
       (nit)
   ```suggestion
         Preconditions.checkState(tierNames.add(tierName), "Tier name: %s 
already exists in tier configs", tierName);
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -82,6 +91,10 @@ public void init(HelixManager helixManager, TableConfig 
tableConfig) {
         tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
     _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
 
+    if (InstanceAssignmentConfigUtils.shouldRelocateToTiers(tableConfig)) {

Review comment:
       Move this calculation into the `rebalanceTable()` because we don't need 
it for `assignSegment()`. Same for `RealtimeSegmentAssignment`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions 
instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && 
!tierInstancePartitionsMap.isEmpty()) {
+      LOGGER.info("Rebalancing tiers: {} for table: {} with bootstrap: {}", 
tierInstancePartitionsMap.keySet(),
+          _offlineTableName, bootstrap);
+
+      // get tier to segment assignment map i.e. current assignments split by 
tiers they are eligible for
+      SegmentAssignmentUtils.TierSegmentAssignment tierSegmentAssignment =
+          new SegmentAssignmentUtils.TierSegmentAssignment(_offlineTableName, 
_sortedTiers, currentAssignment);
+      Map<String, Map<String, Map<String, String>>> 
tierNameToSegmentAssignmentMap =
+          tierSegmentAssignment.getTierNameToSegmentAssignmentMap();
+
+      // for each tier, calculate new assignment using instancePartitions for 
that tier
+      newTierAssignments = new 
ArrayList<>(tierNameToSegmentAssignmentMap.size());
+      for (Map.Entry<String, Map<String, Map<String, String>>> entry : 
tierNameToSegmentAssignmentMap.entrySet()) {
+        String tierName = entry.getKey();
+        Map<String, Map<String, String>> tierCurrentAssignment = 
entry.getValue();
+
+        InstancePartitions tierInstancePartitions = 
tierInstancePartitionsMap.get(tierName);
+        Preconditions
+            .checkNotNull(tierInstancePartitions, "Failed to find instance 
partitions for tier: %s of table: %s",
+                tierName, _offlineTableName);
+        checkReplication(tierInstancePartitions);
+
+        LOGGER.info("Rebalancing tier: {} for table: {} with instance 
partitions: {}", tierName, _offlineTableName,
+            tierInstancePartitions);
+        newTierAssignments.add(reassignSegments(tierName, 
tierCurrentAssignment, tierInstancePartitions, bootstrap));
+      }
+
+      // rest of the operations should happen only on segments which were not 
already assigned as part of tiers
+      subsetAssignment = tierSegmentAssignment.getNonTierSegmentAssignment();
+    }
+
     LOGGER.info("Rebalancing table: {} with instance partitions: {}, 
bootstrap: {}", _offlineTableName,
-        instancePartitions, bootstrap);
-    checkReplication(instancePartitions);
+        offlineInstancePartitions, bootstrap);
+    checkReplication(offlineInstancePartitions);
+    Map<String, Map<String, String>> newAssignment =
+        reassignSegments(InstancePartitionsType.OFFLINE.toString(), 
subsetAssignment, offlineInstancePartitions,
+            bootstrap);
+
+    // add tier assignments, if available
+    if (CollectionUtils.isNotEmpty(newTierAssignments)) {
+      newTierAssignments.forEach(newAssignment::putAll);
+    }
+
+    LOGGER.info("Rebalanced table: {}, number of segments to be moved to each 
instance: {}", _offlineTableName,
+        
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment));
+    return newAssignment;
+  }
 
+  /**
+   * Rebalances segments in the current assignment using the 
instancePartitions and returns new assignment
+   */
+  private Map<String, Map<String, String>> reassignSegments(String 
instancePartitionType,
+      Map<String, Map<String, String>> currentSegmentAssignment, 
InstancePartitions instancePartitions,

Review comment:
       (nit) `currentAssignment` for concise?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/tier/TieredStorageRelocator.java
##########
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.tier;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to run rebalancer in background to relocate segments to 
storage tiers
+ * TODO: we could potentially get rid of tagOverrideConfig and rely on this 
relocator for moving COMPLETED segments
+ */
+public class TieredStorageRelocator extends ControllerPeriodicTask<Void> {

Review comment:
       Please keep only one segment relocator. Currently there are 2 
relocators: `TieredStorageRelocator` and `RealtimeSegmentRelocator`. They can 
rebalance the same table at the same time which could cause problem. Recommend 
replacing `RealtimeSegmentRelocator` with `SegmentRelocator` which handles both 
completed segment relocation and tier storage relocation.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions 
instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first
+    List<Map<String, Map<String, String>>> newTierAssignments = null;
+    if (tierInstancePartitionsMap != null && 
!tierInstancePartitionsMap.isEmpty()) {
+      LOGGER.info("Rebalancing tiers: {} for table: {} with bootstrap: {}", 
tierInstancePartitionsMap.keySet(),
+          _offlineTableName, bootstrap);
+
+      // get tier to segment assignment map i.e. current assignments split by 
tiers they are eligible for
+      SegmentAssignmentUtils.TierSegmentAssignment tierSegmentAssignment =
+          new SegmentAssignmentUtils.TierSegmentAssignment(_offlineTableName, 
_sortedTiers, currentAssignment);
+      Map<String, Map<String, Map<String, String>>> 
tierNameToSegmentAssignmentMap =
+          tierSegmentAssignment.getTierNameToSegmentAssignmentMap();
+
+      // for each tier, calculate new assignment using instancePartitions for 
that tier
+      newTierAssignments = new 
ArrayList<>(tierNameToSegmentAssignmentMap.size());
+      for (Map.Entry<String, Map<String, Map<String, String>>> entry : 
tierNameToSegmentAssignmentMap.entrySet()) {
+        String tierName = entry.getKey();
+        Map<String, Map<String, String>> tierCurrentAssignment = 
entry.getValue();
+
+        InstancePartitions tierInstancePartitions = 
tierInstancePartitionsMap.get(tierName);
+        Preconditions
+            .checkNotNull(tierInstancePartitions, "Failed to find instance 
partitions for tier: %s of table: %s",
+                tierName, _offlineTableName);
+        checkReplication(tierInstancePartitions);
+
+        LOGGER.info("Rebalancing tier: {} for table: {} with instance 
partitions: {}", tierName, _offlineTableName,

Review comment:
       (nit) Including `bootstrap` in the log?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -91,6 +104,17 @@ public void init(HelixManager helixManager, TableConfig 
tableConfig) {
     }
   }
 
+  /**
+   * Returns a sorted list of Tiers from the TierConfigList in table config.
+   * Keeps only those which have "pinotServer" storage type.
+   */
+  @VisibleForTesting
+  protected List<Tier> getSortedTiersForPinotServerStorage(List<TierConfig> 
tierConfigList) {
+    return tierConfigList.stream().filter(t -> 
TierFactory.PINOT_SERVER_STORAGE_TYPE.equals(t.getStorageType()))

Review comment:
       Prefer the old non-lambda way for both performance and readability.
   Also consider moving this common logic into `TierUtils`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions 
instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;

Review comment:
       Rename to `nonTierAssignment` for clarity?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
##########
@@ -161,23 +185,80 @@ private void checkReplication(InstancePartitions 
instancePartitions) {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
Configuration config) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
OFFLINE instance partitions for table: %s",
-        _offlineTableName);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    Preconditions
+        .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
+            _offlineTableName);
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
+
+    Map<String, Map<String, String>> subsetAssignment = currentAssignment;
+    // rebalance tiers first

Review comment:
       (nit) Capitalize the first letter for convention




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to