This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b8af790c2f Enable uploading segments to realtime tables (#8584)
b8af790c2f is described below

commit b8af790c2f26b5bafcc8f154e1dae2edc60b43c0
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Mon May 16 18:48:40 2022 -0700

    Enable uploading segments to realtime tables (#8584)
---
 .../apache/pinot/common/utils/SegmentUtils.java    |  32 +++--
 .../PinotSegmentUploadDownloadRestletResource.java |  13 +-
 .../helix/core/PinotHelixResourceManager.java      |  48 +++++---
 .../segment/RealtimeSegmentAssignment.java         |  50 ++++----
 ...altimeNonReplicaGroupSegmentAssignmentTest.java | 114 ++++++++++++++----
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |   5 +-
 .../RealtimeReplicaGroupSegmentAssignmentTest.java | 132 ++++++++++++++++-----
 .../core/data/manager/BaseTableDataManager.java    |   3 +-
 .../manager/offline/OfflineTableDataManager.java   |  12 --
 .../manager/realtime/RealtimeTableDataManager.java |   5 +-
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  96 ++++++++++++++-
 .../tests/RealtimeClusterIntegrationTest.java      |  16 ++-
 12 files changed, 392 insertions(+), 134 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 21b3fb4f3e..04be4a1c58 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.common.utils;
 
 import com.google.common.base.Preconditions;
-import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
@@ -35,32 +35,28 @@ public class SegmentUtils {
   // Returns the partition id of a realtime segment based segment name and 
segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. 
Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String 
realtimeTableName,
-      HelixManager helixManager, String partitionColumn) {
+  @Nullable
+  public static Integer getRealtimeSegmentPartitionId(String segmentName, 
String realtimeTableName,
+      HelixManager helixManager,
+      String partitionColumn) {
     // A fast path if the segmentName is a LLC segment name and we can get the 
partition id from the name directly.
     if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
       return new LLCSegmentName(segmentName).getPartitionGroupId();
     }
-    // Otherwise, retrieve the partition id from the segment zk metadata. 
Currently only realtime segments from upsert
-    // enabled tables have partition ids in their segment metadata.
+    // Otherwise, retrieve the partition id from the segment zk metadata.
     SegmentZKMetadata segmentZKMetadata =
         
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
realtimeTableName, segmentName);
     Preconditions
         .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
             segmentName, realtimeTableName);
     SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
-    Preconditions.checkState(segmentPartitionMetadata != null,
-        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata", segmentName,
-        realtimeTableName);
-    ColumnPartitionMetadata columnPartitionMetadata =
-        segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
-    Preconditions.checkState(columnPartitionMetadata != null,
-        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata for column: %s. Check "
-            + "if the table is an upsert table.", segmentName, 
realtimeTableName, partitionColumn);
-    Set<Integer> partitions = columnPartitionMetadata.getPartitions();
-    Preconditions.checkState(partitions.size() == 1,
-        "Segment ZK metadata for segment: %s of table: %s contains multiple 
partitions for column: %s with %s",
-        segmentName, realtimeTableName, partitionColumn, partitions);
-    return partitions.iterator().next();
+    if (segmentPartitionMetadata != null) {
+      ColumnPartitionMetadata columnPartitionMetadata =
+          
segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
+      if (columnPartitionMetadata != null && 
columnPartitionMetadata.getPartitions().size() == 1) {
+        return columnPartitionMetadata.getPartitions().iterator().next();
+      }
+    }
+    return null;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 01aa8c832b..0590fb2b1a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -292,16 +292,9 @@ public class PinotSegmentUploadDownloadRestletResource {
         LOGGER.warn("Table name is not provided as request query parameter 
when uploading segment: {} for table: {}",
             segmentName, rawTableName);
       }
-      String tableNameWithType;
-      if (tableType == TableType.OFFLINE) {
-        tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
-      } else {
-        tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-        if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) {
-          throw new ControllerApplicationException(LOGGER,
-              "Cannot upload segment to non-upsert real-time table: " + 
tableNameWithType, Response.Status.FORBIDDEN);
-        }
-      }
+      String tableNameWithType = tableType == TableType.OFFLINE
+          ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+          : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
 
       String clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
       LOGGER.info("Processing upload request for segment: {} of table: {} with 
upload type: {} from client: {}, "
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f9d0e86447..49fe4cdd9e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -133,6 +133,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.config.table.TableStats;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -167,7 +168,7 @@ public class PinotHelixResourceManager {
 
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
-  public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
secondL
+  public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
second
 
   private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new 
SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
 
@@ -1997,26 +1998,14 @@ public class PinotHelixResourceManager {
   public void assignTableSegment(String tableNameWithType, String segmentName) 
{
     String segmentZKMetadataPath =
         
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, 
segmentName);
-    InstancePartitionsType instancePartitionsType;
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      // In an upsert enabled LLC realtime table, all segments of the same 
partition are collocated on the same server
-      // -- consuming or completed. So it is fine to use CONSUMING as the 
InstancePartitionsType.
-      // TODO When upload segments is open to all realtime tables, we should 
change the type to COMPLETED instead.
-      // In addition, RealtimeSegmentAssignment.assignSegment(..) method 
should be updated so that the method does not
-      // assign segments to CONSUMING instance partition only.
-      instancePartitionsType = InstancePartitionsType.CONSUMING;
-    } else {
-      instancePartitionsType = InstancePartitionsType.OFFLINE;
-    }
 
     // Assign instances for the segment and add it into IdealState
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+          fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
       SegmentAssignment segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
Collections
-          .singletonMap(instancePartitionsType, InstancePartitionsUtils
-              .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
instancePartitionsType));
       synchronized (getTableUpdaterLock(tableNameWithType)) {
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
           assert idealState != null;
@@ -2050,6 +2039,35 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private Map<InstancePartitionsType, InstancePartitions> 
fetchOrComputeInstancePartitions(String tableNameWithType,
+      TableConfig tableConfig) {
+    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+      return Collections.singletonMap(InstancePartitionsType.OFFLINE, 
InstancePartitionsUtils
+          .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
InstancePartitionsType.OFFLINE));
+    }
+    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
+      // In an upsert enabled LLC realtime table, all segments of the same 
partition are collocated on the same server
+      // -- consuming or completed. So it is fine to use CONSUMING as the 
InstancePartitionsType.
+      return Collections.singletonMap(InstancePartitionsType.CONSUMING, 
InstancePartitionsUtils
+          .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
InstancePartitionsType.CONSUMING));
+    }
+    // for non-upsert realtime tables, if COMPLETED instance partitions is 
available or tag override for
+    // completed segments is provided in the tenant config, COMPLETED instance 
partitions type is used
+    // otherwise CONSUMING instance partitions type is used.
+    InstancePartitionsType instancePartitionsType = 
InstancePartitionsType.COMPLETED;
+    InstancePartitions instancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+        InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
+    if (instancePartitions != null) {
+      return Collections.singletonMap(instancePartitionsType, 
instancePartitions);
+    }
+    TagOverrideConfig tagOverrideConfig = 
tableConfig.getTenantConfig().getTagOverrideConfig();
+    if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted() 
== null) {
+      instancePartitionsType = InstancePartitionsType.CONSUMING;
+    }
+    return Collections.singletonMap(instancePartitionsType,
+        
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixZkManager, 
tableConfig, instancePartitionsType));
+  }
+
   public boolean isUpsertTable(String tableName) {
     TableConfig realtimeTableConfig = 
getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
     if (realtimeTableConfig == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index c97b53e75c..3ede57c6d7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -104,18 +104,20 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
-        _realtimeTableName);
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
+    Map.Entry<InstancePartitionsType, InstancePartitions> 
typeToInstancePartitions =
+        instancePartitionsMap.entrySet().iterator().next();
+    InstancePartitionsType instancePartitionsType = 
typeToInstancePartitions.getKey();
+    InstancePartitions instancePartitions = 
typeToInstancePartitions.getValue();
     Preconditions
         .checkState(instancePartitions.getNumPartitions() == 1, "Instance 
partitions: %s should contain 1 partition",
             instancePartitions.getInstancePartitionsName());
     LOGGER.info("Assigning segment: {} with instance partitions: {} for table: 
{}", segmentName, instancePartitions,
         _realtimeTableName);
     checkReplication(instancePartitions);
-
-    List<String> instancesAssigned = assignConsumingSegment(segmentName, 
instancePartitions);
-
+    List<String> instancesAssigned = instancePartitionsType == 
InstancePartitionsType.CONSUMING
+        ? assignConsumingSegment(segmentName, instancePartitions)
+        : assignCompletedSegment(segmentName, currentAssignment, 
instancePartitions);
     LOGGER.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
         _realtimeTableName);
     return instancesAssigned;
@@ -141,9 +143,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
    * Helper method to assign instances for CONSUMING segment based on the 
segment partition id and instance partitions.
    */
   private List<String> assignConsumingSegment(String segmentName, 
InstancePartitions instancePartitions) {
-    int partitionGroupId =
-        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
-
+    int partitionGroupId = getPartitionGroupId(segmentName);
     int numReplicaGroups = instancePartitions.getNumReplicaGroups();
     if (numReplicaGroups == 1) {
       // Non-replica-group based assignment:
@@ -158,7 +158,8 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
       int numInstances = instances.size();
       List<String> instancesAssigned = new ArrayList<>(_replication);
       for (int replicaId = 0; replicaId < _replication; replicaId++) {
-        instancesAssigned.add(instances.get((partitionGroupId * _replication + 
replicaId) % numInstances));
+        int instanceIndex = (partitionGroupId * _replication + replicaId) % 
numInstances;
+        instancesAssigned.add(instances.get(instanceIndex));
       }
       return instancesAssigned;
     } else {
@@ -185,8 +186,9 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
 
     InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
     InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(consumingInstancePartitions != null,
-        "Failed to find COMPLETED or CONSUMING instance partitions for table: 
%s", _realtimeTableName);
+    Preconditions
+        .checkState(consumingInstancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+            _realtimeTableName);
     Preconditions.checkState(consumingInstancePartitions.getNumPartitions() == 
1,
         "Instance partitions: %s should contain 1 partition", 
consumingInstancePartitions.getInstancePartitionsName());
     boolean includeConsuming = config
@@ -331,8 +333,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
         int numPartitions = instancePartitions.getNumPartitions();
         Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new 
HashMap<>();
         for (String segmentName : currentAssignment.keySet()) {
-          int partitionGroupId = SegmentUtils
-              .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, 
_helixManager, _partitionColumn);
+          int partitionGroupId = getPartitionGroupId(segmentName);
           int instancePartitionId = partitionGroupId % numPartitions;
           
instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new 
ArrayList<>())
               .add(segmentName);
@@ -368,12 +369,21 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
-      return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, partitionGroupId);
+      int partitionId = getPartitionGroupId(segmentName) % 
instancePartitions.getNumPartitions();
+      return 
SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, partitionId);
+    }
+  }
+
+  private int getPartitionGroupId(String segmentName) {
+    Integer segmentPartitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
+    if (segmentPartitionId == null) {
+      // This case is for the uploaded segments for which there's no partition 
information.
+      // A random, but consistent, partition id is calculated based on the 
hash code of the segment name.
+      // Note that '% 10K' is used to prevent having partition ids with large 
value which will be problematic later in
+      // instance assignment formula.
+      segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000);
     }
+    return segmentPartitionId;
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 1fe98d8eba..19d5235d77 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -18,11 +18,16 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -34,6 +39,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -71,7 +81,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
             .setLLC(true).build();
-    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
+    _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig);
 
     _instancePartitionsMap = new TreeMap<>();
     // CONSUMING instances:
@@ -102,11 +112,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
 
   @Test
   public void testAssignSegment() {
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Segment 0 (partition 0) should be assigned to instance 0, 1, 2
@@ -128,11 +140,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
 
   @Test
   public void testRelocateCompletedSegments() {
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
 
@@ -150,10 +164,31 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
             SegmentStateModel.OFFLINE);
     currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
 
+    // Add an uploaded ONLINE segment to the consuming instances (i.e. no 
separation between consuming & completed)
+    List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment1", 
"UploadedSegment2");
+    onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    for (String uploadedSegName : uploadedSegmentNames) {
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(uploadedSegName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+      currentAssignment.put(uploadedSegName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
+    }
+    // Now there should be 103 segments assigned
+    assertEquals(currentAssignment.size(), NUM_SEGMENTS + 
uploadedSegmentNames.size() + 1);
+    // Each segment should have 3 replicas and all assigned instances should 
be prefixed with consuming
+    currentAssignment.forEach((type, instanceStateMap) -> {
+      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+      instanceStateMap.forEach((instance, state) -> {
+        if (!instance.startsWith("badInstance_")) {
+          assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+        }
+      });
+    });
+
     // Rebalance without COMPLETED instance partitions should not change the 
segment assignment
-    Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap = new TreeMap<>();
-    noRelocationInstancePartitionsMap
-        .put(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     assertEquals(_segmentAssignment
             .rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()),
         currentAssignment);
@@ -162,29 +197,36 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
     // instances
     Map<String, Map<String, String>> newAssignment = _segmentAssignment
         .rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, 
new BaseConfiguration());
-    assertEquals(newAssignment.size(), NUM_SEGMENTS + 1);
+    assertEquals(newAssignment.size(), NUM_SEGMENTS + 
uploadedSegmentNames.size() + 1);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
-        // COMPLETED (ONLINE) segments
-        Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
-        for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-          
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
-        }
+        // check COMPLETED (ONLINE) segments
+        newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
+          assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+          assertEquals(state, SegmentStateModel.ONLINE);
+        });
       } else {
-        // CONSUMING segments
-        Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
-        for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-          
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
-        }
+        // check CONSUMING segments
+        newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
+          assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+          assertEquals(state, SegmentStateModel.CONSUMING);
+        });
       }
     }
-    // Relocated segments should be balanced (each instance should have at 
least 28 segments assigned)
+    // check the uploaded segments
+    for (String uploadedSegName : uploadedSegmentNames) {
+      newAssignment.get(uploadedSegName).forEach((instance, state) -> {
+        assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+        assertEquals(state, SegmentStateModel.ONLINE);
+      });
+    }
+
+    // Relocated segments should be balanced (each instance should have at 
least 29 segments assigned)
     int[] numSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
COMPLETED_INSTANCES);
     assertEquals(numSegmentsAssignedPerInstance.length, 
NUM_COMPLETED_INSTANCES);
-    int expectedMinNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * 
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
+    int expectedMinNumSegmentsPerInstance =
+        (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / 
NUM_COMPLETED_INSTANCES + 1;
     for (int i = 0; i < NUM_COMPLETED_INSTANCES; i++) {
       assertTrue(numSegmentsAssignedPerInstance[i] >= 
expectedMinNumSegmentsPerInstance);
     }
@@ -232,6 +274,28 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
     }
   }
 
+  @Test
+  public void testAssignSegmentForUploadedSegments() {
+    // CONSUMING instance partition has been tested in previous method, only 
test COMPLETED here
+    Map<InstancePartitionsType, InstancePartitions> 
onlyCompletedInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.COMPLETED, 
_instancePartitionsMap.get(InstancePartitionsType.COMPLETED));
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    Map<String, List<String>> expectedUploadedSegmentToInstances = 
ImmutableMap.of(
+        "uploadedSegment_0", ImmutableList.of("completedInstance_0", 
"completedInstance_1", "completedInstance_2"),
+        "uploadedSegment_1", ImmutableList.of("completedInstance_3", 
"completedInstance_4", "completedInstance_5"),
+        "uploadedSegment_2", ImmutableList.of("completedInstance_6", 
"completedInstance_7", "completedInstance_8"),
+        "uploadedSegment_3", ImmutableList.of("completedInstance_9", 
"completedInstance_0", "completedInstance_1"),
+        "uploadedSegment_4", ImmutableList.of("completedInstance_2", 
"completedInstance_3", "completedInstance_4")
+    );
+    expectedUploadedSegmentToInstances.forEach((segmentName, 
expectedInstances) -> {
+      List<String> actualInstances =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyCompletedInstancePartitionMap);
+      assertEquals(actualInstances, expectedInstances);
+      currentAssignment
+          .put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(actualInstances, 
SegmentStateModel.ONLINE));
+    });
+  }
+
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
       List<String> instancesAssigned) {
     // Change the state of the last segment in the same partition from 
CONSUMING to ONLINE if exists
@@ -246,4 +310,12 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
     currentAssignment.put(_segments.get(segmentId),
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
+
+  private HelixManager createHelixManager() {
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new 
ZNRecord("0"));
+    return helixManager;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index 1724d0aeb8..97aa23aab2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -162,11 +163,13 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
 
   @Test
   public void testRelocateCompletedSegments() {
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 09cd07bc9e..ba8929d267 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -18,12 +18,16 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,6 +40,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -74,7 +83,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
             
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
             .build();
-    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null, 
tableConfig);
+    _segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), 
tableConfig);
 
     _instancePartitionsMap = new TreeMap<>();
     // CONSUMING instances:
@@ -116,19 +125,16 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
   }
 
-  @Test
-  public void testFactory() {
-    assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
-  }
-
   @Test
   public void testAssignSegment() {
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
 
       // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
@@ -151,11 +157,13 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
 
   @Test
   public void testRelocateCompletedSegments() {
+    Map<InstancePartitionsType, InstancePartitions> 
onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
-          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyConsumingInstancePartitionMap);
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
 
@@ -173,10 +181,31 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
             SegmentStateModel.OFFLINE);
     currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
 
+    // Add 3 uploaded ONLINE segments to the consuming instances (i.e. no 
separation between consuming & completed)
+    List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment0", 
"UploadedSegment1", "UploadedSegment2");
+    onlyConsumingInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    for (String uploadedSegName : uploadedSegmentNames) {
+      List<String> instancesAssigned =
+          _segmentAssignment.assignSegment(uploadedSegName, currentAssignment, 
onlyConsumingInstancePartitionMap);
+      currentAssignment.put(uploadedSegName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
+    }
+
+    assertEquals(currentAssignment.size(), NUM_SEGMENTS + 
uploadedSegmentNames.size() + 1);
+    // Each segment should have 3 replicas and all assigned instances should 
be prefixed with consuming
+    currentAssignment.forEach((type, instanceStateMap) -> {
+      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+      instanceStateMap.forEach((instance, state) -> {
+        if (!instance.startsWith("badInstance_")) {
+          assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+        }
+      });
+    });
+
     // Rebalance without COMPLETED instance partitions should not change the 
segment assignment
-    Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap = new TreeMap<>();
-    noRelocationInstancePartitionsMap
-        .put(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+    Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap =
+        ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     assertEquals(_segmentAssignment
             .rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()),
         currentAssignment);
@@ -185,31 +214,38 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     // instances
     Map<String, Map<String, String>> newAssignment = _segmentAssignment
         .rebalanceTable(currentAssignment, _instancePartitionsMap, null, null, 
new BaseConfiguration());
-    assertEquals(newAssignment.size(), NUM_SEGMENTS + 1);
+    assertEquals(newAssignment.size(), NUM_SEGMENTS + 
uploadedSegmentNames.size() + 1);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
-        // COMPLETED (ONLINE) segments
-        Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
-        for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-          
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
-        }
+        // check COMPLETED (ONLINE) segments
+        newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
+          assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+          assertEquals(state, SegmentStateModel.ONLINE);
+        });
       } else {
-        // CONSUMING segments
-        Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
-        for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-          
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
-        }
+        // check CONSUMING segments
+        newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
+          assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+          assertEquals(state, SegmentStateModel.CONSUMING);
+        });
       }
     }
-    // Relocated segments should be balanced (each instance should have 24 
segments assigned)
+    // check the uploaded segments
+    for (String uploadedSegName : uploadedSegmentNames) {
+      newAssignment.get(uploadedSegName).forEach((instance, state) -> {
+        assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+        assertEquals(state, SegmentStateModel.ONLINE);
+      });
+    }
+
+    // Relocated segments should be balanced
     int[] numSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
COMPLETED_INSTANCES);
-    int[] expectedNumSegmentsAssignedPerInstance = new 
int[NUM_COMPLETED_INSTANCES];
-    int numSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * 
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
-    Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
-    assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
+    int expectedNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * 
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
+    for (int actualNumSegments : numSegmentsAssignedPerInstance) {
+      assertTrue(actualNumSegments == expectedNumSegmentsPerInstance
+          || actualNumSegments == expectedNumSegmentsPerInstance + 1);
+    }
 
     // Rebalance with COMPLETED instance partitions including CONSUMING 
segments should give the same assignment
     BaseConfiguration rebalanceConfig = new BaseConfiguration();
@@ -256,6 +292,34 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     }
   }
 
+  @Test
+  public void testAssignSegmentForUploadedSegments() {
+    // CONSUMING instance partition has been tested in previous method, only 
test COMPLETED here
+    Map<InstancePartitionsType, InstancePartitions> 
onlyCompletedInstancePartitionMap =
+        ImmutableMap.of(InstancePartitionsType.COMPLETED, 
_instancePartitionsMap.get(InstancePartitionsType.COMPLETED));
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    // COMPLETED instances:
+    // {
+    //   0_0=[instance_0, instance_1, instance_2, instance_3],
+    //   0_1=[instance_4, instance_5, instance_6, instance_7],
+    //   0_2=[instance_8, instance_9, instance_10, instance_11]
+    // }
+    Map<String, List<String>> expectedUploadedSegmentToInstances = 
ImmutableMap.of(
+        "uploadedSegment_0", ImmutableList.of("completedInstance_0", 
"completedInstance_4", "completedInstance_8"),
+        "uploadedSegment_1", ImmutableList.of("completedInstance_1", 
"completedInstance_5", "completedInstance_9"),
+        "uploadedSegment_2", ImmutableList.of("completedInstance_2", 
"completedInstance_6", "completedInstance_10"),
+        "uploadedSegment_3", ImmutableList.of("completedInstance_3", 
"completedInstance_7", "completedInstance_11"),
+        "uploadedSegment_4", ImmutableList.of("completedInstance_0", 
"completedInstance_4", "completedInstance_8")
+    );
+    expectedUploadedSegmentToInstances.forEach((segmentName, 
expectedInstances) -> {
+      List<String> actualInstances =
+          _segmentAssignment.assignSegment(segmentName, currentAssignment, 
onlyCompletedInstancePartitionMap);
+      assertEquals(actualInstances, expectedInstances);
+      currentAssignment
+          .put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(actualInstances, 
SegmentStateModel.ONLINE));
+    });
+  }
+
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
       List<String> instancesAssigned) {
     // Change the state of the last segment in the same partition from 
CONSUMING to ONLINE if exists
@@ -270,4 +334,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     currentAssignment.put(_segments.get(segmentId),
         SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
   }
+
+  private HelixManager createHelixManager() {
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new 
ZNRecord("0"));
+    return helixManager;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6276db12c0..5b28f04fd3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -189,7 +189,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
       throws Exception {
-    throw new UnsupportedOperationException();
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index e3cf839b61..7e17da42cb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -18,13 +18,8 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
-import java.io.File;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
 
 
 /**
@@ -44,11 +39,4 @@ public class OfflineTableDataManager extends 
BaseTableDataManager {
   @Override
   protected void doShutdown() {
   }
-
-  @Override
-  public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
-      throws Exception {
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 730bb81752..198023632c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -369,8 +369,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
-    int partitionGroupId = SegmentUtils
+    Integer partitionGroupId = SegmentUtils
         .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, 
_helixManager, _primaryKeyColumns.get(0));
+    Preconditions.checkNotNull(partitionGroupId, String
+        .format("PartitionGroupId is not available for segment: '%s' 
(upsert-enabled table: %s)", segmentName,
+            _tableNameWithType));
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
         
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
     ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 864f6b7287..7fb120cfb7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -21,16 +21,24 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
@@ -42,6 +50,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -103,6 +112,85 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
     }
   }
 
+  @Override
+  protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, 
TableConfig tableConfig)
+      throws Exception {
+    if (!_tarDir.exists()) {
+      _tarDir.mkdir();
+    }
+    if (!_segmentDir.exists()) {
+      _segmentDir.mkdir();
+    }
+
+    // create segments out of the avro files (segments will be placed in 
_tarDir)
+    List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, 
tableConfig, schema, 0, _segmentDir, _tarDir);
+
+    // upload segments to controller
+    uploadSegmentsToController(getTableName(), _tarDir, false, false);
+
+    // upload the first segment again to verify refresh
+    uploadSegmentsToController(getTableName(), _tarDir, true, false);
+
+    // upload the first segment again to verify refresh with different segment 
crc
+    uploadSegmentsToController(getTableName(), _tarDir, true, true);
+
+    // add avro files to the original list so H2 will have the uploaded data 
as well
+    avroFiles.addAll(copyOfAvroFiles);
+  }
+
+  private void uploadSegmentsToController(String tableName, File tarDir, 
boolean onlyFirstSegment, boolean changeCrc)
+      throws Exception {
+    File[] segmentTarFiles = tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+    if (onlyFirstSegment) {
+      numSegments = 1;
+    }
+    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles[0];
+        if (changeCrc) {
+          changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
+        }
+        assertEquals(fileUploadDownloadClient
+            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName,
+                TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK);
+      } else {
+        // Upload segments in parallel
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(() -> fileUploadDownloadClient
+              .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName,
+                  TableType.REALTIME).getStatusCode()));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
+      }
+    }
+  }
+
+  private void changeCrcInSegmentZKMetadata(String tableName, String 
segmentFilePath) {
+    int startIdx = segmentFilePath.indexOf("mytable_");
+    int endIdx = segmentFilePath.indexOf(".tar.gz");
+    String segmentName = segmentFilePath.substring(startIdx, endIdx);
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
+    segmentZKMetadata.setCrc(111L);
+    _helixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // all the data that was ingested from Kafka also got uploaded via the 
controller's upload endpoint
+    return super.getCountStarResult() * 2;
+  }
+
   @BeforeClass
   @Override
   public void setUp()
@@ -138,9 +226,11 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
     List<SegmentZKMetadata> segmentsZKMetadata =
         ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, 
realtimeTableName);
-    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
-      assertEquals(segmentZKMetadata.getSizeThresholdToFlushSegment(),
-          getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+    for (SegmentZKMetadata segMetadata : segmentsZKMetadata) {
+      if (segMetadata.getStatus() != 
CommonConstants.Segment.Realtime.Status.UPLOADED) {
+        assertEquals(segMetadata.getSizeThresholdToFlushSegment(),
+            getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+      }
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6208c36c5b..dfa7a34d82 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -23,6 +23,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -55,12 +57,17 @@ public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSe
     List<File> avroFiles = unpackAvroData(_tempDir);
 
     // Create and upload the schema and table config
-    addSchema(createSchema());
-    addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    addTableConfig(tableConfig);
 
     // Push data into Kafka
     pushAvroIntoKafka(avroFiles);
 
+    // create segments and upload them to controller
+    createSegmentsAndUpload(avroFiles, schema, tableConfig);
+
     // Set up the H2 connection
     setUpH2Connection(avroFiles);
 
@@ -71,6 +78,11 @@ public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSe
     waitForAllDocsLoaded(600_000L);
   }
 
+  protected void createSegmentsAndUpload(List<File> avroFile, Schema schema, 
TableConfig tableConfig)
+      throws Exception {
+    // Do nothing. This is specific to LLC use cases for now.
+  }
+
   @Override
   protected void overrideServerConf(PinotConfiguration configuration) {
     
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
 false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to