This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch deep.store.dir.structure
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/deep.store.dir.structure by 
this push:
     new 48d7602  Revert deep store directory structure changes after 
introducing upsert (#6974)
48d7602 is described below

commit 48d760248917e621b635b01982bce1e5805b7975
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Tue May 25 12:16:21 2021 -0700

    Revert deep store directory structure changes after introducing upsert 
(#6974)
---
 .../metadata/segment/ColumnPartitionMetadata.java  |   4 +-
 .../common/utils/FileUploadDownloadClient.java     |  24 ---
 .../apache/pinot/common/utils/SegmentUtils.java    |  68 -------
 .../apache/pinot/controller/ControllerConf.java    |   5 +-
 .../PinotSegmentUploadDownloadRestletResource.java |  68 +++----
 .../pinot/controller/api/upload/ZKOperator.java    |  55 +++---
 .../helix/core/PinotHelixResourceManager.java      |  96 +++------
 .../segment/RealtimeSegmentAssignment.java         |  35 ++--
 .../helix/core/util/ZKMetadataUtils.java           |  32 +--
 .../api/PinotSegmentRestletResourceTest.java       |   6 +-
 .../pinot/controller/api/TableViewsTest.java       |   2 +-
 .../controller/api/upload/ZKOperatorTest.java      |  10 +-
 .../helix/ControllerInstanceToggleTest.java        |   2 +-
 .../controller/helix/ControllerSentinelTestV2.java |   6 +-
 .../controller/helix/PinotResourceManagerTest.java |  99 ++--------
 .../TableRebalancerClusterStatelessTest.java       |   4 +-
 .../helix/core/retention/RetentionManagerTest.java |   4 +-
 .../controller/utils/SegmentMetadataMockUtils.java |   8 -
 .../validation/ValidationManagerTest.java          |   4 +-
 .../manager/realtime/RealtimeTableDataManager.java |  26 +--
 .../tests/BaseClusterIntegrationTest.java          |  25 ---
 .../tests/BasicAuthRealtimeIntegrationTest.java    |   6 +-
 .../UpsertTableSegmentUploadIntegrationTest.java   | 219 ---------------------
 .../src/test/resources/upsert_table_test.schema    |  33 ----
 .../src/test/resources/upsert_test.tar.gz          | Bin 9911 -> 0 bytes
 .../upsert/PartitionUpsertMetadataManager.java     |   2 -
 .../apache/pinot/spi/utils/CommonConstants.java    |   7 +-
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   5 +-
 .../pinot/tools/perf/PerfBenchmarkRunner.java      |   2 +-
 29 files changed, 155 insertions(+), 702 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
index 537f01e..0220c4f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
@@ -34,8 +34,8 @@ import org.apache.commons.lang3.StringUtils;
 /**
  * Class for partition related column metadata:
  * <ul>
- *   <li>The name of the Partition function used to map the column values to 
their partitions</li>
- *   <li>Total number of partitions</li>
+ *   <li>Partition function</li>
+ *   <li>Number of total partitions</li>
  *   <li>Set of partitions the column contains</li>
  * </ul>
  */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index ba32144..c874891 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -62,7 +62,6 @@ import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
@@ -92,7 +91,6 @@ public class FileUploadDownloadClient implements Closeable {
   public static class QueryParameters {
     public static final String ENABLE_PARALLEL_PUSH_PROTECTION = 
"enableParallelPushProtection";
     public static final String TABLE_NAME = "tableName";
-    public static final String TABLE_TYPE = "tableType";
   }
 
   public enum FileUploadType {
@@ -690,28 +688,6 @@ public class FileUploadDownloadClient implements Closeable 
{
   }
 
   /**
-   * Upload segment with segment file using default settings. Include table 
name and type as a request parameters.
-   *
-   * @param uri URI
-   * @param segmentName Segment name
-   * @param segmentFile Segment file
-   * @param tableName Table name with or without type suffix
-   * @param tableType Table type
-   * @return Response
-   * @throws IOException
-   * @throws HttpErrorStatusException
-   */
-  public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File 
segmentFile, String tableName,
-      TableType tableType)
-      throws IOException, HttpErrorStatusException {
-    // Add table name and type request parameters
-    NameValuePair tableNameValuePair = new 
BasicNameValuePair(QueryParameters.TABLE_NAME, tableName);
-    NameValuePair tableTypeValuePair = new 
BasicNameValuePair(QueryParameters.TABLE_TYPE, tableType.name());
-    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
-    return uploadSegment(uri, segmentName, segmentFile, null, parameters, 
DEFAULT_SOCKET_TIMEOUT_MS);
-  }
-
-  /**
    * Upload segment with segment file input stream.
    *
    * Note: table name has to be set as a parameter.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
deleted file mode 100644
index 84c7a79..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.utils;
-
-import com.google.common.base.Preconditions;
-import java.util.Set;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-
-
-// Util functions related to segments.
-public class SegmentUtils {
-  // Returns the partition id of a realtime segment based segment name and 
segment metadata info retrieved via Helix.
-  // Important: The method is costly because it may read data from zookeeper. 
Do not use it in any query execution
-  // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String 
realtimeTableName,
-      HelixManager helixManager, String partitionColumn) {
-    // A fast path if the segmentName is a LLC segment name and we can get the 
partition id from the name directly.
-    if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      return new LLCSegmentName(segmentName).getPartitionGroupId();
-    }
-    // Otherwise, retrieve the partition id from the segment zk metadata. 
Currently only realtime segments from upsert
-    // enabled tables have partition ids in their segment metadata.
-    RealtimeSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
-        .getRealtimeSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
realtimeTableName, segmentName);
-    Preconditions
-        .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
-            segmentName, realtimeTableName);
-    return getSegmentPartitionIdFromZkMetaData(realtimeTableName, 
segmentZKMetadata, partitionColumn);
-  }
-
-  private static int getSegmentPartitionIdFromZkMetaData(String 
realtimeTableName,
-      RealtimeSegmentZKMetadata segmentZKMetadata, String partitionColumn) {
-    String segmentName = segmentZKMetadata.getSegmentName();
-    Preconditions.checkState(segmentZKMetadata.getPartitionMetadata() != null,
-        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata", segmentName,
-        realtimeTableName);
-
-    ColumnPartitionMetadata partitionMetadata =
-        
segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(partitionColumn);
-    Preconditions.checkState(partitionMetadata != null,
-        "Segment ZK metadata for segment: %s of table: %s does not contain 
partition metadata for column: %s. Check if the table is an upsert table.",
-        segmentName, realtimeTableName, partitionColumn);
-    Set<Integer> partitions = partitionMetadata.getPartitions();
-    Preconditions.checkState(partitions.size() == 1,
-        "Segment ZK metadata for segment: %s of table: %s contains multiple 
partitions for column: %s with %s",
-        segmentName, realtimeTableName, partitionColumn, partitions);
-    return partitions.iterator().next();
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index ce91c01..9d47b27 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -80,8 +80,6 @@ public class ControllerConf extends PinotConfiguration {
         "controller.offline.segment.interval.checker.frequencyInSeconds";
     public static final String 
REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
         "controller.realtime.segment.validation.frequencyInSeconds";
-    public static final String 
REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS =
-        "controller.realtime.segment.validation.initialDelayInSeconds";
     public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS 
=
         "controller.broker.resource.validation.frequencyInSeconds";
     public static final String 
BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
@@ -630,8 +628,7 @@ public class ControllerConf extends PinotConfiguration {
   }
 
   public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
-    return 
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
-        getPeriodicTaskInitialDelayInSeconds());
+    return getPeriodicTaskInitialDelayInSeconds();
   }
 
   public long getPinotTaskManagerInitialDelaySeconds() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 35a3e07..7584ab0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -184,7 +184,7 @@ public class PinotSegmentUploadDownloadRestletResource {
     return builder.build();
   }
 
-  private SuccessResponse uploadSegment(@Nullable String tableName, TableType 
tableType, FormDataMultiPart multiPart,
+  private SuccessResponse uploadSegment(@Nullable String tableName, 
FormDataMultiPart multiPart,
       boolean enableParallelPushProtection, HttpHeaders headers, Request 
request, boolean moveSegmentToFinalLocation) {
     String uploadTypeStr = null;
     String crypterClassNameInHeader = null;
@@ -248,36 +248,24 @@ public class PinotSegmentUploadDownloadRestletResource {
         LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
-      String tableNameWithType;
-      if (tableType == TableType.OFFLINE) {
-        tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
-      } else {
-        if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
-          throw new UnsupportedOperationException(
-              "Upload segment to non-upsert realtime table is not supported " 
+ rawTableName);
-        }
-        tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-      }
-
+      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
       String clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
       LOGGER.info("Processing upload request for segment: {} of table: {} from 
client: {}, ingestion descriptor: {}",
-          segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
+          segmentName, offlineTableName, clientAddress, ingestionDescriptor);
 
-      // Skip segment validation if upload is to an offline table and only 
segment metadata. Skip segment validation for
-      // realtime tables because the feature is experimental and only 
applicable to upsert enabled table currently.
-      if (tableType == TableType.OFFLINE && uploadType != 
FileUploadDownloadClient.FileUploadType.METADATA) {
+      // Skip segment validation if upload only segment metadata
+      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
         // Validate segment
         new SegmentValidator(_pinotHelixResourceManager, _controllerConf, 
_executor, _connectionManager,
-            _controllerMetrics, 
_leadControllerManager.isLeaderForTable(tableNameWithType))
-            .validateOfflineSegment(tableNameWithType, segmentMetadata, 
tempSegmentDir);
+            _controllerMetrics, 
_leadControllerManager.isLeaderForTable(offlineTableName)).validateOfflineSegment(offlineTableName,
 segmentMetadata, tempSegmentDir);
       }
 
       // Encrypt segment
       String crypterClassNameInTableConfig =
-          
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType);
+          
_pinotHelixResourceManager.getCrypterClassNameFromTableConfig(offlineTableName);
       Pair<String, File> encryptionInfo =
           encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, 
uploadedSegmentIsEncrypted,
-              crypterClassNameInHeader, crypterClassNameInTableConfig, 
segmentName, tableNameWithType);
+              crypterClassNameInHeader, crypterClassNameInTableConfig, 
segmentName, tableName);
 
       String crypterClassName = encryptionInfo.getLeft();
       File finalSegmentFile = encryptionInfo.getRight();
@@ -289,17 +277,17 @@ public class PinotSegmentUploadDownloadRestletResource {
       if (!moveSegmentToFinalLocation) {
         LOGGER
             .info("Setting zkDownloadUri: to {} for segment: {} of table: {}, 
skipping move", downloadUri, segmentName,
-                tableNameWithType);
+                offlineTableName);
         zkDownloadUri = downloadUri;
       } else {
-        zkDownloadUri = getZkDownloadURIForSegmentUpload(tableNameWithType, 
segmentName);
+        zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, 
segmentName);
       }
 
       // Zk operations
-      completeZkOperations(enableParallelPushProtection, headers, 
finalSegmentFile, tableNameWithType, segmentMetadata,
+      completeZkOperations(enableParallelPushProtection, headers, 
finalSegmentFile, rawTableName, segmentMetadata,
           segmentName, zkDownloadUri, moveSegmentToFinalLocation, 
crypterClassName);
 
-      return new SuccessResponse("Successfully uploaded segment: " + 
segmentName + " of table: " + tableNameWithType);
+      return new SuccessResponse("Successfully uploaded segment: " + 
segmentName + " of table: " + rawTableName);
     } catch (WebApplicationException e) {
       throw e;
     } catch (Exception e) {
@@ -324,7 +312,7 @@ public class PinotSegmentUploadDownloadRestletResource {
 
   Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File 
tempEncryptedFile,
       boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, 
String crypterClassNameInTableConfig,
-      String segmentName, String tableNameWithType) {
+      String segmentName, String tableName) {
 
     boolean segmentNeedsEncryption = 
!Strings.isNullOrEmpty(crypterClassNameInTableConfig);
 
@@ -343,13 +331,13 @@ public class PinotSegmentUploadDownloadRestletResource {
       throw new ControllerApplicationException(LOGGER, String.format(
           "Uploaded segment is encrypted with '%s' while table config requires 
'%s' as crypter "
               + "(segment name = '%s', table name = '%s').", 
crypterUsedInUploadedSegment,
-          crypterClassNameInTableConfig, segmentName, tableNameWithType), 
Response.Status.INTERNAL_SERVER_ERROR);
+          crypterClassNameInTableConfig, segmentName, tableName), 
Response.Status.INTERNAL_SERVER_ERROR);
     }
 
     // encrypt segment
     PinotCrypter pinotCrypter = 
PinotCrypterFactory.create(crypterClassNameInTableConfig);
     LOGGER.info("Using crypter class '{}' for encrypting '{}' to '{}' (segment 
name = '{}', table name = '{}').",
-        crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, 
segmentName, tableNameWithType);
+        crypterClassNameInTableConfig, tempDecryptedFile, tempEncryptedFile, 
segmentName, tableName);
     pinotCrypter.encrypt(tempDecryptedFile, tempEncryptedFile);
 
     return out;
@@ -386,14 +374,14 @@ public class PinotSegmentUploadDownloadRestletResource {
   }
 
   private void completeZkOperations(boolean enableParallelPushProtection, 
HttpHeaders headers, File uploadedSegmentFile,
-      String tableNameWithType, SegmentMetadata segmentMetadata, String 
segmentName, String zkDownloadURI,
+      String rawTableName, SegmentMetadata segmentMetadata, String 
segmentName, String zkDownloadURI,
       boolean moveSegmentToFinalLocation, String crypter)
       throws Exception {
     URI finalSegmentLocationURI = URIUtils
-        
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
tableNameWithType,
+        
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
rawTableName,
             URIUtils.encode(segmentName));
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
-    zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, 
finalSegmentLocationURI, uploadedSegmentFile,
+    zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, 
finalSegmentLocationURI, uploadedSegmentFile,
         enableParallelPushProtection, headers, zkDownloadURI, 
moveSegmentToFinalLocation, crypter);
   }
 
@@ -416,13 +404,10 @@ public class PinotSegmentUploadDownloadRestletResource {
   // it keeps it at the downloadURI header that is set. We will not support 
this endpoint going forward.
   public void uploadSegmentAsJson(String segmentJsonStr,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
-      @ApiParam(value = "Type of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) 
@DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, enableParallelPushProtection,
-              headers, request, false));
+      asyncResponse.resume(uploadSegment(tableName, null, 
enableParallelPushProtection, headers, request, false));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -438,13 +423,10 @@ public class PinotSegmentUploadDownloadRestletResource {
   // For the multipart endpoint, we will always move segment to final location 
regardless of the segment endpoint.
   public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
-      @ApiParam(value = "Type of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) 
@DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
multiPart, enableParallelPushProtection,
-              headers, request, true));
+      asyncResponse.resume(uploadSegment(tableName, multiPart, 
enableParallelPushProtection, headers, request, true));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -462,13 +444,10 @@ public class PinotSegmentUploadDownloadRestletResource {
   // endpoint in how it moves the segment to a Pinot-determined final 
directory.
   public void uploadSegmentAsJsonV2(String segmentJsonStr,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
-      @ApiParam(value = "Type of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) 
@DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, enableParallelPushProtection,
-              headers, request, true));
+      asyncResponse.resume(uploadSegment(tableName, null, 
enableParallelPushProtection, headers, request, true));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
@@ -484,13 +463,10 @@ public class PinotSegmentUploadDownloadRestletResource {
   // This behavior does not differ from v1 of the same endpoint.
   public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
       @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
-      @ApiParam(value = "Type of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) 
@DefaultValue("OFFLINE") String tableType,
       @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
       @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
     try {
-      asyncResponse.resume(
-          uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
multiPart, enableParallelPushProtection,
-              headers, request, true));
+      asyncResponse.resume(uploadSegment(tableName, multiPart, 
enableParallelPushProtection, headers, request, true));
     } catch (Throwable t) {
       asyncResponse.resume(t);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 0c6ad65..7b743c6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -57,36 +57,33 @@ public class ZKOperator {
     _controllerMetrics = controllerMetrics;
   }
 
-  public void completeSegmentOperations(String tableNameWithType, 
SegmentMetadata segmentMetadata,
+  public void completeSegmentOperations(String rawTableName, SegmentMetadata 
segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter)
       throws Exception {
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
-    ZNRecord segmentMetadataZnRecord = 
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
+
+    // Brand new segment, not refresh, directly add the segment
+    ZNRecord segmentMetadataZnRecord =
+        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, 
segmentName);
     if (segmentMetadataZnRecord == null) {
-      LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
+      LOGGER.info("Adding new segment {} from table {}", segmentName, 
rawTableName);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, crypter,
-          tableNameWithType, segmentName, moveSegmentToFinalLocation);
+          rawTableName, segmentName, moveSegmentToFinalLocation);
       return;
     }
 
-    // TODO Allow segment refreshing for realtime tables.
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      throw new ControllerApplicationException(LOGGER,
-          "Refresh existing segment " + segmentName + " for realtime table " + 
tableNameWithType + " is not yet supported ",
-          Response.Status.NOT_IMPLEMENTED
-      );
-    }
+    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, rawTableName);
 
-    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
tableNameWithType, segmentName,
+        enableParallelPushProtection, headers, zkDownloadURI, crypter, 
offlineTableName, segmentName,
         segmentMetadataZnRecord, moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, 
HttpHeaders headers, String zkDownloadURI,
-      String crypter, String tableNameWithType, String segmentName, ZNRecord 
znRecord,
+      String crypter, String offlineTableName, String segmentName, ZNRecord 
znRecord,
       boolean moveSegmentToFinalLocation)
       throws Exception {
 
@@ -94,7 +91,7 @@ public class ZKOperator {
     long existingCrc = existingSegmentZKMetadata.getCrc();
 
     // Check if CRC match when IF-MATCH header is set
-    checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+    checkCRC(headers, offlineTableName, segmentName, existingCrc);
 
     // Check segment upload start time when parallel push protection enabled
     if (enableParallelPushProtection) {
@@ -104,12 +101,12 @@ public class ZKOperator {
         if (System.currentTimeMillis() - segmentUploadStartTime > 
_controllerConf.getSegmentUploadTimeoutInMillis()) {
           // Last segment upload does not finish properly, replace the segment
           LOGGER
-              .error("Segment: {} of table: {} was not properly uploaded, 
replacing it", segmentName, tableNameWithType);
+              .error("Segment: {} of table: {} was not properly uploaded, 
replacing it", segmentName, offlineTableName);
           
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED,
 1L);
         } else {
           // Another segment upload is in progress
           throw new ControllerApplicationException(LOGGER,
-              "Another segment upload is in progress for segment: " + 
segmentName + " of table: " + tableNameWithType
+              "Another segment upload is in progress for segment: " + 
segmentName + " of table: " + offlineTableName
                   + ", retry later", Response.Status.CONFLICT);
         }
       }
@@ -117,9 +114,9 @@ public class ZKOperator {
       // Lock the segment by setting the upload start time in ZK
       
existingSegmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
       if (!_pinotHelixResourceManager
-          .updateZkMetadata(tableNameWithType, existingSegmentZKMetadata, 
znRecord.getVersion())) {
+          .updateZkMetadata(offlineTableName, existingSegmentZKMetadata, 
znRecord.getVersion())) {
         throw new ControllerApplicationException(LOGGER,
-            "Failed to lock the segment: " + segmentName + " of table: " + 
tableNameWithType + ", retry later",
+            "Failed to lock the segment: " + segmentName + " of table: " + 
offlineTableName + ", retry later",
             Response.Status.CONFLICT);
       }
     }
@@ -155,9 +152,9 @@ public class ZKOperator {
         // (creation time is not included in the crc)
         
existingSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
         existingSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
-        if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
existingSegmentZKMetadata)) {
+        if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName, 
existingSegmentZKMetadata)) {
           throw new RuntimeException(
-              "Failed to update ZK metadata for segment: " + segmentName + " 
of table: " + tableNameWithType);
+              "Failed to update ZK metadata for segment: " + segmentName + " 
of table: " + offlineTableName);
         }
       } else {
         // New segment is different with the existing one, update ZK metadata 
and refresh the segment
@@ -169,16 +166,16 @@ public class ZKOperator {
           LOGGER.info("Moved segment {} from temp location {} to {}", 
segmentName,
               currentSegmentLocation.getAbsolutePath(), 
finalSegmentLocationURI.getPath());
         } else {
-          LOGGER.info("Skipping segment move, keeping segment {} from table {} 
at {}", segmentName, tableNameWithType,
+          LOGGER.info("Skipping segment move, keeping segment {} from table {} 
at {}", segmentName, offlineTableName,
               zkDownloadURI);
         }
 
         _pinotHelixResourceManager
-            .refreshSegment(tableNameWithType, segmentMetadata, 
existingSegmentZKMetadata, zkDownloadURI, crypter);
+            .refreshSegment(offlineTableName, segmentMetadata, 
existingSegmentZKMetadata, zkDownloadURI, crypter);
       }
     } catch (Exception e) {
-      if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
existingSegmentZKMetadata)) {
-        LOGGER.error("Failed to update ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
+      if (!_pinotHelixResourceManager.updateZkMetadata(offlineTableName, 
existingSegmentZKMetadata)) {
+        LOGGER.error("Failed to update ZK metadata for segment: {} of table: 
{}", segmentName, offlineTableName);
       }
       throw e;
     }
@@ -204,7 +201,7 @@ public class ZKOperator {
   }
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI 
finalSegmentLocationURI,
-      File currentSegmentLocation, String zkDownloadURI, String crypter, 
String tableNameWithType, String segmentName,
+      File currentSegmentLocation, String zkDownloadURI, String crypter, 
String rawTableName, String segmentName,
       boolean moveSegmentToFinalLocation) {
     // For v1 segment uploads, we will not move the segment
     if (moveSegmentToFinalLocation) {
@@ -214,14 +211,14 @@ public class ZKOperator {
             .info("Moved segment {} from temp location {} to {}", segmentName, 
currentSegmentLocation.getAbsolutePath(),
                 finalSegmentLocationURI.getPath());
       } catch (Exception e) {
-        LOGGER.error("Could not move segment {} from table {} to permanent 
directory", segmentName, tableNameWithType, e);
+        LOGGER.error("Could not move segment {} from table {} to permanent 
directory", segmentName, rawTableName, e);
         throw new RuntimeException(e);
       }
     } else {
-      LOGGER.info("Skipping segment move, keeping segment {} from table {} at 
{}", segmentName, tableNameWithType,
+      LOGGER.info("Skipping segment move, keeping segment {} from table {} at 
{}", segmentName, rawTableName,
           zkDownloadURI);
     }
-    _pinotHelixResourceManager.addNewSegment(tableNameWithType, 
segmentMetadata, zkDownloadURI, crypter);
+    _pinotHelixResourceManager.addNewSegment(rawTableName, segmentMetadata, 
zkDownloadURI, crypter);
   }
 
   private void moveSegmentToPermanentDirectory(File currentSegmentLocation, 
URI finalSegmentLocationURI)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8441d12..b1a9369 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -86,7 +86,6 @@ import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.HashUtil;
@@ -121,7 +120,6 @@ import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.config.table.TableStats;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.data.Schema;
@@ -1625,110 +1623,72 @@ public class PinotHelixResourceManager {
     return instanceSet;
   }
 
-  public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl) {
-    addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
+  public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl) {
+    addNewSegment(tableName, segmentMetadata, downloadUrl, null);
   }
 
-  public void addNewSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, String downloadUrl,
+  public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl,
       @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
-    InstancePartitionsType instancePartitionsType;
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
-    ZNRecord znRecord;
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      Preconditions.checkState(isUpsertTable(tableNameWithType),
-          "Upload segment " + segmentName + " for non upsert enabled realtime 
table " + tableNameWithType
-              + " is not supported");
-      // In an upsert enabled LLC realtime table, all segments of the same 
partition are collocated on the same server
-      // -- consuming or completed. So it is fine to use CONSUMING as the 
InstancePartitionsType.
-      // TODO When upload segments is open to all realtime tables, we should 
change the type to COMPLETED instead.
-      // In addition, RealtimeSegmentAssignment.assignSegment(..) method 
should be updated so that the method does not
-      // assign segments to CONSUMING instance partition only.
-      instancePartitionsType = InstancePartitionsType.CONSUMING;
-      // Build the realtime segment zk metadata with necessary fields.
-      LLCRealtimeSegmentZKMetadata segmentZKMetadata = new 
LLCRealtimeSegmentZKMetadata();
-      ZKMetadataUtils
-          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.REALTIME);
-      segmentZKMetadata.setDownloadUrl(downloadUrl);
-      segmentZKMetadata.setCrypterName(crypter);
-      
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
-      znRecord = segmentZKMetadata.toZNRecord();
-    } else {
-      instancePartitionsType = InstancePartitionsType.OFFLINE;
-      // Build the offline segment zk metadata with necessary fields.
-      OfflineSegmentZKMetadata segmentZKMetadata = new 
OfflineSegmentZKMetadata();
-      ZKMetadataUtils
-          .updateSegmentMetadata(segmentZKMetadata, segmentMetadata, 
CommonConstants.Segment.SegmentType.OFFLINE);
-      segmentZKMetadata.setDownloadUrl(downloadUrl);
-      segmentZKMetadata.setCrypterName(crypter);
-      segmentZKMetadata.setPushTime(System.currentTimeMillis());
-      znRecord = segmentZKMetadata.toZNRecord();
-    }
+    OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
+    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
+    offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
+    offlineSegmentZKMetadata.setCrypterName(crypter);
+    offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
     String segmentZKMetadataPath =
-        
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, 
segmentName);
+        
ZKMetadataProvider.constructPropertyStorePathForSegment(offlineTableName, 
segmentName);
     Preconditions.checkState(
-        _propertyStore.set(segmentZKMetadataPath, znRecord, 
AccessOption.PERSISTENT),
-        "Failed to set segment ZK metadata for table: " + tableNameWithType + 
", segment: " + segmentName);
-    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, tableNameWithType);
-    assignTableSegment(tableNameWithType, segmentName, segmentZKMetadataPath, 
instancePartitionsType);
-  }
-
+        _propertyStore.set(segmentZKMetadataPath, 
offlineSegmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT),
+        "Failed to set segment ZK metadata for table: " + offlineTableName + 
", segment: " + segmentName);
+    LOGGER.info("Added segment: {} of table: {} to property store", 
segmentName, offlineTableName);
 
-  private void assignTableSegment(String tableNameWithType, String 
segmentName, String segmentZKMetadataPath,
-      InstancePartitionsType instancePartitionsType) {
     // Assign instances for the segment and add it into IdealState
     try {
-      TableConfig tableConfig = getTableConfig(tableNameWithType);
+      TableConfig offlineTableConfig = getTableConfig(offlineTableName);
       Preconditions
-          .checkState(tableConfig != null, "Failed to find table config for 
table: " + tableNameWithType);
+          .checkState(offlineTableConfig != null, "Failed to find table config 
for table: " + offlineTableName);
       SegmentAssignment segmentAssignment =
-          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig);
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
offlineTableConfig);
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
Collections
-          .singletonMap(instancePartitionsType, InstancePartitionsUtils
-              .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, 
instancePartitionsType));
-      synchronized (getTableUpdaterLock(tableNameWithType)) {
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
+          .singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixZkManager, 
offlineTableConfig, InstancePartitionsType.OFFLINE));
+      synchronized (getTableUpdaterLock(offlineTableName)) {
+        HelixHelper.updateIdealState(_helixZkManager, offlineTableName, 
idealState -> {
           assert idealState != null;
           Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
           if (currentAssignment.containsKey(segmentName)) {
             LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                tableNameWithType);
+                offlineTableName);
           } else {
             List<String> assignedInstances =
                 segmentAssignment.assignSegment(segmentName, 
currentAssignment, instancePartitionsMap);
             LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                tableNameWithType);
+                offlineTableName);
             currentAssignment.put(segmentName,
                 SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
           }
           return idealState;
         });
-        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
+        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, offlineTableName);
       }
     } catch (Exception e) {
       LOGGER
           .error("Caught exception while adding segment: {} to IdealState for 
table: {}, deleting segment ZK metadata",
-              segmentName, tableNameWithType, e);
+              segmentName, offlineTableName, e);
       if (_propertyStore.remove(segmentZKMetadataPath, 
AccessOption.PERSISTENT)) {
-        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, tableNameWithType);
+        LOGGER.info("Deleted segment ZK metadata for segment: {} of table: 
{}", segmentName, offlineTableName);
       } else {
         LOGGER
-            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, tableNameWithType);
+            .error("Failed to deleted segment ZK metadata for segment: {} of 
table: {}", segmentName, offlineTableName);
       }
       throw e;
     }
   }
 
-  public boolean isUpsertTable(String tableName) {
-    TableConfig realtimeTableConfig = 
getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
-    if (realtimeTableConfig == null) {
-      return false;
-    }
-    UpsertConfig upsertConfig = realtimeTableConfig.getUpsertConfig();
-    return ((upsertConfig != null) && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE);
-  }
-
   private Object getTableUpdaterLock(String offlineTableName) {
     return _tableUpdaterLocks[(offlineTableName.hashCode() & 
Integer.MAX_VALUE) % _tableUpdaterLocks.length];
   }
@@ -1757,7 +1717,7 @@ public class PinotHelixResourceManager {
     // ZK metadata to refresh the segment (server will compare the segment ZK 
metadata with the local metadata to decide
     // whether to download the new segment; broker will update the the segment 
partition info & time boundary based on
     // the segment ZK metadata)
-    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+    ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
     offlineSegmentZKMetadata.setRefreshTime(System.currentTimeMillis());
     offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
     offlineSegmentZKMetadata.setCrypterName(crypter);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 0e1974a..4f2ee84 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -32,9 +32,8 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
-import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -86,19 +85,15 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
   private HelixManager _helixManager;
   private String _realtimeTableName;
   private int _replication;
-  private String _partitionColumn;
 
   @Override
   public void init(HelixManager helixManager, TableConfig tableConfig) {
     _helixManager = helixManager;
     _realtimeTableName = tableConfig.getTableName();
     _replication = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
-        tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
-    _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
 
-    LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {}, 
partitionColumn: {} for table: {}",
-        _replication, _partitionColumn, _realtimeTableName);
+    LOGGER.info("Initialized RealtimeSegmentAssignment with replication: {} 
for table: {}", _replication,
+        _realtimeTableName);
   }
 
   @Override
@@ -141,8 +136,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
    * Helper method to assign instances for CONSUMING segment based on the 
segment partition id and instance partitions.
    */
   private List<String> assignConsumingSegment(String segmentName, 
InstancePartitions instancePartitions) {
-    int partitionGroupId =
-        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
+    int partitionGroupId = new 
LLCSegmentName(segmentName).getPartitionGroupId();
 
     int numReplicaGroups = instancePartitions.getNumReplicaGroups();
     if (numReplicaGroups == 1) {
@@ -180,8 +174,9 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
 
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
-      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
+      @Nullable List<Tier> sortedTiers, @Nullable Map<String, 
InstancePartitions> tierInstancePartitionsMap,
+      Configuration config) {
 
     InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
     InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
@@ -303,7 +298,8 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
    * Rebalances segments in the current assignment using the 
instancePartitions and returns new assignment
    */
   private Map<String, Map<String, String>> reassignSegments(String 
instancePartitionType,
-      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions, boolean bootstrap) {
+      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions,
+      boolean bootstrap) {
     Map<String, Map<String, String>> newAssignment;
     if (bootstrap) {
       LOGGER.info("Bootstrapping segment assignment for {} segments of table: 
{}", instancePartitionType,
@@ -320,8 +316,8 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
       if (instancePartitions.getNumReplicaGroups() == 1) {
         // Non-replica-group based assignment
 
-        List<String> instances =
-            
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
 _replication);
+        List<String> instances = SegmentAssignmentUtils
+            .getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, 
_replication);
         newAssignment = SegmentAssignmentUtils
             .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, 
instances, _replication);
       } else {
@@ -329,8 +325,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
 
         Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new 
HashMap<>();
         for (String segmentName : currentAssignment.keySet()) {
-          int partitionGroupId = SegmentUtils
-              .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, 
_helixManager, _partitionColumn);
+          int partitionGroupId = new 
LLCSegmentName(segmentName).getPartitionGroupId();
           partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> 
new ArrayList<>()).add(segmentName);
         }
 
@@ -343,7 +338,8 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
         }
 
         newAssignment = SegmentAssignmentUtils
-            .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions, partitionGroupIdToSegmentsMap);
+            .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions,
+                partitionGroupIdToSegmentsMap);
       }
     }
     return newAssignment;
@@ -364,8 +360,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
+      int segmentPartitionId = new 
LLCSegmentName(segmentName).getPartitionGroupId();
       int numPartitions = instancePartitions.getNumPartitions();
       int partitionGroupId = segmentPartitionId % numPartitions;
       return SegmentAssignmentUtils
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 324ea00..058f574 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -21,8 +21,8 @@ package org.apache.pinot.controller.helix.core.util;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata;
 import 
org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl;
@@ -35,24 +35,24 @@ public class ZKMetadataUtils {
   private ZKMetadataUtils() {
   }
 
-  public static void updateSegmentMetadata(SegmentZKMetadata 
segmentZKMetadata, SegmentMetadata segmentMetadata,
-      SegmentType segmentType) {
-    segmentZKMetadata.setSegmentName(segmentMetadata.getName());
-    segmentZKMetadata.setTableName(segmentMetadata.getTableName());
-    segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
-    segmentZKMetadata.setSegmentType(segmentType);
+  public static void updateSegmentMetadata(OfflineSegmentZKMetadata 
offlineSegmentZKMetadata,
+      SegmentMetadata segmentMetadata) {
+    offlineSegmentZKMetadata.setSegmentName(segmentMetadata.getName());
+    offlineSegmentZKMetadata.setTableName(segmentMetadata.getTableName());
+    offlineSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
+    offlineSegmentZKMetadata.setSegmentType(SegmentType.OFFLINE);
     if (segmentMetadata.getTimeInterval() != null) {
-      segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
-      segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
-      segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+      offlineSegmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
+      offlineSegmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
+      offlineSegmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
     }
-    segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
-    segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
-    segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
+    offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
+    
offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+    offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
     SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
         new 
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
-            segmentZKMetadata.getCustomMap());
-    
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+            offlineSegmentZKMetadata.getCustomMap());
+    
offlineSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
 
     // Extract column partition metadata (if any), and set it into segment ZK 
metadata.
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
@@ -73,7 +73,7 @@ public class ZKMetadataUtils {
     }
 
     if (!columnPartitionMap.isEmpty()) {
-      segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(columnPartitionMap));
+      offlineSegmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(columnPartitionMap));
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 722e551..fac5c18 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -61,8 +61,7 @@ public class PinotSegmentRestletResourceTest {
     // Upload Segments
     for (int i = 0; i < 5; ++i) {
       SegmentMetadata segmentMetadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
-      ControllerTestUtils.getHelixResourceManager()
-          
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), 
segmentMetadata, "downloadUrl");
+      ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, 
segmentMetadata, "downloadUrl");
       segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
     }
 
@@ -72,8 +71,7 @@ public class PinotSegmentRestletResourceTest {
     // Add more segments
     for (int i = 0; i < 5; ++i) {
       SegmentMetadata segmentMetadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME);
-      ControllerTestUtils.getHelixResourceManager()
-          
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), 
segmentMetadata, "downloadUrl");
+      ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, 
segmentMetadata, "downloadUrl");
       segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 3c46bda..ba8898e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -54,7 +54,7 @@ public class TableViewsTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(2).build();
     
Assert.assertEquals(ControllerTestUtils.getHelixManager().getInstanceType(), 
InstanceType.CONTROLLER);
     ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
-    
ControllerTestUtils.getHelixResourceManager().addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME),
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
         SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
OFFLINE_SEGMENT_NAME), "downloadUrl");
 
     // Create the hybrid table
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 9f57faf..597c746 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -41,7 +40,6 @@ import static org.testng.Assert.fail;
 
 public class ZKOperatorTest {
   private static final String TABLE_NAME = "operatorTestTable";
-  private static final String TABLE_NAME_WITH_TYPE = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
   private static final String SEGMENT_NAME = "testSegment";
 
   @BeforeClass
@@ -61,7 +59,7 @@ public class ZKOperatorTest {
     when(segmentMetadata.getCrc()).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(123L);
     HttpHeaders httpHeaders = mock(HttpHeaders.class);
-    zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, 
segmentMetadata, null, null, false, httpHeaders, "downloadUrl",
+    zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders, "downloadUrl",
         false, "crypter");
 
     OfflineSegmentZKMetadata segmentZKMetadata =
@@ -77,7 +75,7 @@ public class ZKOperatorTest {
     // Refresh the segment with unmatched IF_MATCH field
     when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123");
     try {
-      zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, 
segmentMetadata, null, null, false, httpHeaders,
+      zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders,
           "otherDownloadUrl", false, null);
       fail();
     } catch (Exception e) {
@@ -88,7 +86,7 @@ public class ZKOperatorTest {
     // downloadURL and crypter
     
when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345");
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
-    zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, 
segmentMetadata, null, null, false, httpHeaders,
+    zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders,
         "otherDownloadUrl", false, "otherCrypter");
     segmentZKMetadata = ControllerTestUtils
         .getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME, 
SEGMENT_NAME);
@@ -110,7 +108,7 @@ public class ZKOperatorTest {
     // 1 second delay to avoid "org.apache.helix.HelixException: Specified 
EXTERNALVIEW operatorTestTable_OFFLINE is
     // not found!" exception from being thrown sporadically.
     Thread.sleep(1000L);
-    zkOperator.completeSegmentOperations(TABLE_NAME_WITH_TYPE, 
segmentMetadata, null, null, false, httpHeaders,
+    zkOperator.completeSegmentOperations(TABLE_NAME, segmentMetadata, null, 
null, false, httpHeaders,
         "otherDownloadUrl", false, "otherCrypter");
     segmentZKMetadata = ControllerTestUtils
         .getHelixResourceManager().getOfflineSegmentZKMetadata(TABLE_NAME, 
SEGMENT_NAME);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
index 14523a3..726e2d0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
@@ -70,7 +70,7 @@ public class ControllerInstanceToggleTest {
 
     // Add segments
     for (int i = 0; i < ControllerTestUtils.NUM_SERVER_INSTANCES; i++) {
-      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(RAW_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), 
"downloadUrl");
       Assert.assertEquals(
           ControllerTestUtils
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
index 929d651..616b491 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
@@ -26,7 +26,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeClass;
@@ -65,9 +64,8 @@ public class ControllerSentinelTestV2 {
       Assert.assertEquals(
           ControllerTestUtils
               
.getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
 TABLE_NAME + "_OFFLINE").getNumPartitions(), i);
-      ControllerTestUtils.getHelixResourceManager()
-          
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
-              SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), 
"downloadUrl");
+      ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, 
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
+          "downloadUrl");
       Assert.assertEquals(
           ControllerTestUtils
               
.getHelixAdmin().getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
 TABLE_NAME + "_OFFLINE").getNumPartitions(),
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index ea5326e..e1fc87b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -18,23 +18,16 @@
  */
 package org.apache.pinot.controller.helix;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerTestUtils;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
@@ -44,30 +37,15 @@ import org.testng.annotations.Test;
 
 
 public class PinotResourceManagerTest {
-  private static final String OFFLINE_TABLE_NAME = 
"offlineResourceManagerTestTable_OFFLINE";
-  private static final String REALTIME_TABLE_NAME = 
"realtimeResourceManagerTestTable_REALTIME";
-  private static final String NUM_REPLICAS_STRING = "2";
-  private static final String PARTITION_COLUMN = "Partition_Column";
+  private static final String TABLE_NAME = "resourceManagerTestTable";
 
   @BeforeClass
   public void setUp() throws Exception {
     ControllerTestUtils.setupClusterAndValidate();
 
-    // Adding an offline table
-    TableConfig offlineTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build();
-    ControllerTestUtils.getHelixResourceManager().addTable(offlineTableConfig);
-
-    // Adding an upsert enabled realtime table which consumes from a stream 
with 2 partitions
-    Schema dummySchema = 
ControllerTestUtils.createDummySchema(REALTIME_TABLE_NAME);
-    ControllerTestUtils.addSchema(dummySchema);
-    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
-    TableConfig realtimeTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).
-        
setTableName(REALTIME_TABLE_NAME).setSchemaName(dummySchema.getSchemaName()).build();
-    
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
-    realtimeTableConfig.getValidationConfig()
-        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    realtimeTableConfig.setUpsertConfig(new 
UpsertConfig((UpsertConfig.Mode.FULL)));
-    
ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
+    // Adding table
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
   }
 
   @Test
@@ -77,21 +55,21 @@ public class PinotResourceManagerTest {
 
     // Segment ZK metadata does not exist
     Assert.assertFalse(
-        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
 + "_OFFLINE", segmentZKMetadata, 0));
+        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + 
"_OFFLINE", segmentZKMetadata, 0));
 
     // Set segment ZK metadata
     Assert.assertTrue(
-        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
 + "_OFFLINE", segmentZKMetadata));
+        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + 
"_OFFLINE", segmentZKMetadata));
 
     // Update ZK metadata
     Assert.assertEquals(
-        
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME
 + "_OFFLINE", "testSegment").getVersion(), 0);
+        
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME
 + "_OFFLINE", "testSegment").getVersion(), 0);
     Assert.assertTrue(
-        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
 + "_OFFLINE", segmentZKMetadata, 0));
+        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + 
"_OFFLINE", segmentZKMetadata, 0));
     Assert.assertEquals(
-        
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME
 + "_OFFLINE", "testSegment").getVersion(), 1);
+        
ControllerTestUtils.getHelixResourceManager().getSegmentMetadataZnRecord(TABLE_NAME
 + "_OFFLINE", "testSegment").getVersion(), 1);
     Assert.assertFalse(
-        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(OFFLINE_TABLE_NAME
 + "_OFFLINE", segmentZKMetadata, 0));
+        
ControllerTestUtils.getHelixResourceManager().updateZkMetadata(TABLE_NAME + 
"_OFFLINE", segmentZKMetadata, 0));
   }
 
   /**
@@ -104,12 +82,11 @@ public class PinotResourceManagerTest {
 
   @Test
   public void testBasicAndConcurrentAddingAndDeletingSegments() throws 
Exception {
-    final String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME);
+    final String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
 
     // Basic add/delete case
     for (int i = 1; i <= 2; i++) {
-      ControllerTestUtils.getHelixResourceManager().addNewSegment(
-          OFFLINE_TABLE_NAME, 
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME),
+      ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME, 
SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME),
           "downloadUrl");
     }
     IdealState idealState = ControllerTestUtils
@@ -131,8 +108,8 @@ public class PinotResourceManagerTest {
         @Override
         public void run() {
           for (int i = 0; i < 10; i++) {
-            
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
-                
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), 
"downloadUrl");
+            
ControllerTestUtils.getHelixResourceManager().addNewSegment(TABLE_NAME,
+                SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), 
"downloadUrl");
           }
         }
       });
@@ -161,54 +138,6 @@ public class PinotResourceManagerTest {
     Assert.assertEquals(idealState.getPartitionSet().size(), 0);
   }
 
-  @Test
-  public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() {
-    // Add three segments: two from partition 0 and 1 from partition 1;
-    String partition0Segment0 = "realtimeResourceManagerTestTable__aa";
-    String partition0Segment1 = "realtimeResourceManagerTestTable__bb";
-    String partition1Segment1 = "realtimeResourceManagerTestTable__cc";
-    
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
 SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, 
partition0Segment0, PARTITION_COLUMN, 0),
-        "downloadUrl");
-    
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
 SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, 
partition0Segment1, PARTITION_COLUMN, 0),
-        "downloadUrl");
-    
ControllerTestUtils.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME,
 SegmentMetadataMockUtils
-            .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, 
partition1Segment1, PARTITION_COLUMN, 1),
-        "downloadUrl");
-    Map<String, Integer> segment2PartitionId = new HashMap<>();
-    segment2PartitionId.put(partition0Segment0, 0);
-    segment2PartitionId.put(partition0Segment1, 0);
-    segment2PartitionId.put(partition1Segment1, 1);
-
-    IdealState idealState = ControllerTestUtils.getHelixAdmin()
-        .getResourceIdealState(ControllerTestUtils.getHelixClusterName(),
-            TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME));
-    Set<String> segments = idealState.getPartitionSet();
-    Assert.assertEquals(segments.size(), 5);
-    Assert.assertTrue(segments.contains(partition0Segment0));
-    Assert.assertTrue(segments.contains(partition0Segment1));
-    Assert.assertTrue(segments.contains(partition1Segment1));
-
-    // Check the segments of the same partition is assigned to the same set of 
servers.
-    Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
-    for (String segment : segments) {
-      Integer partitionId;
-      if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
-        partitionId = new LLCSegmentName(segment).getPartitionGroupId();
-      } else {
-        partitionId = segment2PartitionId.get(segment);
-      }
-      Assert.assertNotNull(partitionId);
-      Set<String> instances = idealState.getInstanceSet(segment);
-      if (segmentAssignment.containsKey(partitionId)) {
-        Assert.assertEquals(instances, segmentAssignment.get(partitionId));
-      } else {
-        segmentAssignment.put(partitionId, instances);
-      }
-    }
-  }
-
   @AfterClass
   public void tearDown() {
     ControllerTestUtils.cleanup();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 196633a..e2393e3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -105,7 +105,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     // Add the segments
     int numSegments = 10;
     for (int i = 0; i < numSegments; i++) {
-      _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+      _helixResourceManager.addNewSegment(RAW_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
     }
     Map<String, Map<String, String>> oldSegmentAssignment =
@@ -343,7 +343,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
     // keep decreasing end time from today in steps of 3. 3 segments don't 
move. 3 segment on tierA. 4 segments on tierB
     for (int i = 0; i < numSegments; i++) {
-      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME, 
SegmentMetadataMockUtils
+      _helixResourceManager.addNewSegment(TIERED_TABLE_NAME, 
SegmentMetadataMockUtils
           .mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i, nowInDays), null);
       nowInDays -= 3;
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index cc7ecaf..6d52c03 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -72,7 +72,7 @@ public class RetentionManagerTest {
     for (int i = 0; i < numOlderSegments; ++i) {
       SegmentMetadata segmentMetadata = mockSegmentMetadata(pastTimeStamp, 
pastTimeStamp, timeUnit);
       OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
-      ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+      ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
       metadataList.add(offlineSegmentZKMetadata);
       removedSegments.add(offlineSegmentZKMetadata.getSegmentName());
     }
@@ -81,7 +81,7 @@ public class RetentionManagerTest {
       SegmentMetadata segmentMetadata =
           mockSegmentMetadata(dayAfterTomorrowTimeStamp, 
dayAfterTomorrowTimeStamp, timeUnit);
       OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
-      ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata, CommonConstants.Segment.SegmentType.OFFLINE);
+      ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
       metadataList.add(offlineSegmentZKMetadata);
     }
     final TableConfig tableConfig = createOfflineTableConfig();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 95e400a..1e0e210 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -19,12 +19,9 @@
 package org.apache.pinot.controller.utils;
 
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.segment.local.partition.MurmurPartitionFunction;
 import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata;
 import 
org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -78,7 +75,6 @@ public class SegmentMetadataMockUtils {
     ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
     Set<Integer> partitions = Collections.singleton(partitionNumber);
     when(columnMetadata.getPartitions()).thenReturn(partitions);
-    when(columnMetadata.getPartitionFunction()).thenReturn(new 
MurmurPartitionFunction(5));
 
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     if (columnName != null) {
@@ -87,10 +83,6 @@ public class SegmentMetadataMockUtils {
     when(segmentMetadata.getTableName()).thenReturn(tableName);
     when(segmentMetadata.getName()).thenReturn(segmentName);
     when(segmentMetadata.getCrc()).thenReturn("0");
-
-    Map<String, ColumnMetadata> columnMetadataMap = new HashMap<>();
-    columnMetadataMap.put(columnName, columnMetadata);
-    when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
     return segmentMetadata;
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 1296e38..86dbaf8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -50,7 +50,7 @@ import static org.testng.Assert.assertEquals;
  */
 public class ValidationManagerTest {
   private static final String TEST_TABLE_NAME = "validationTable";
-  private static final String OFFLINE_TEST_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
+  private static final String TEST_TABLE_TWO = "validationTable2";
   private static final String TEST_SEGMENT_NAME = "testSegment";
 
   private TableConfig _offlineTableConfig;
@@ -68,7 +68,7 @@ public class ValidationManagerTest {
   public void testPushTimePersistence() {
     SegmentMetadata segmentMetadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, 
TEST_SEGMENT_NAME);
 
-    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TEST_TABLE_NAME,
 segmentMetadata, "downloadUrl");
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(TEST_TABLE_NAME, 
segmentMetadata, "downloadUrl");
     OfflineSegmentZKMetadata offlineSegmentZKMetadata =
         
ControllerTestUtils.getHelixResourceManager().getOfflineSegmentZKMetadata(TEST_TABLE_NAME,
 TEST_SEGMENT_NAME);
     long pushTime = offlineSegmentZKMetadata.getPushTime();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5d4d43d..83beeb2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -45,7 +45,6 @@ import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
@@ -263,24 +262,24 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
     Preconditions.checkNotNull(schema);
 
-    File segmentDir = new File(_indexDir, segmentName);
+    File indexDir = new File(_indexDir, segmentName);
     // Restart during segment reload might leave segment in inconsistent state 
(index directory might not exist but
     // segment backup directory existed), need to first try to recover from 
reload failure before checking the existence
     // of the index directory and loading segment from it
-    LoaderUtils.reloadFailureRecovery(segmentDir);
+    LoaderUtils.reloadFailureRecovery(indexDir);
 
     boolean isLLCSegment = 
SegmentName.isLowLevelConsumerSegmentName(segmentName);
-    if (segmentDir.exists()) {
+    if (indexDir.exists()) {
       // Segment already exists on disk
       if (realtimeSegmentZKMetadata.getStatus() == Status.DONE) {
         // Metadata has been committed, load the local segment
         try {
-          addSegment(ImmutableSegmentLoader.load(segmentDir, 
indexLoadingConfig, schema));
+          addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
           return;
         } catch (Exception e) {
           if (isLLCSegment) {
             // For LLC and segments, delete the local copy and download a new 
copy from the controller
-            FileUtils.deleteQuietly(segmentDir);
+            FileUtils.deleteQuietly(indexDir);
             if (e instanceof V3RemoveIndexException) {
               _logger.info("Unable to remove index from V3 format segment: {}, 
downloading a new copy", segmentName, e);
             } else {
@@ -294,18 +293,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         }
       } else {
         // Metadata has not been committed, delete the local segment
-        FileUtils.deleteQuietly(segmentDir);
+        FileUtils.deleteQuietly(indexDir);
       }
-    } else if (realtimeSegmentZKMetadata.getStatus() == Status.UPLOADED) {
-      // The segment is uploaded to an upsert enabled realtime table. Download 
the segment and load.
-      Preconditions.checkArgument(realtimeSegmentZKMetadata instanceof 
LLCRealtimeSegmentZKMetadata,
-          "Upload segment is not LLC segment");
-      String downURL = 
((LLCRealtimeSegmentZKMetadata)realtimeSegmentZKMetadata).getDownloadUrl();
-      Preconditions.checkNotNull(downURL, "Upload segment metadata has no 
download url");
-      downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, downURL);
-      _logger.info("Downloaded, untarred and add segment {} of table {} from 
{}", segmentName, tableConfig.getTableName(),
-          downURL);
-      return;
     }
 
     // Start a new consuming segment or download the segment from the 
controller
@@ -361,8 +350,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     columnToReaderMap.put(_timeColumnName, new 
PinotSegmentColumnReader(immutableSegment, _timeColumnName));
     int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
     String segmentName = immutableSegment.getSegmentName();
-    int partitionGroupId = SegmentUtils
-        .getRealtimeSegmentPartitionId(segmentName, this.getTableName(), 
_helixManager, _primaryKeyColumns.get(0));
+    int partitionGroupId = new 
LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId();
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
         
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
     int numPrimaryKeyColumns = _primaryKeyColumns.size();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 23efd0e..5486a43 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -38,15 +38,10 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
-import org.apache.pinot.spi.config.table.RoutingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -365,26 +360,6 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   /**
-   * Creates a new Upsert enabled table config.
-   */
-  protected TableConfig createUpsertTableConfig(File sampleAvroFile, String 
primaryKeyColumn, int numPartitions) {
-    AvroFileSchemaKafkaAvroMessageDecoder.avroFile = sampleAvroFile;
-    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new 
HashMap<>();
-    columnPartitionConfigMap.put(primaryKeyColumn, new 
ColumnPartitionConfig("Murmur", numPartitions));
-
-    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
-        .setTimeColumnName(getTimeColumnName())
-        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
-        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(useLlc())
-        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
-        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig((UpsertConfig.Mode.FULL))).build();
-  }
-
-  /**
    * Returns the REALTIME table config in the cluster.
    */
   protected TableConfig getRealtimeTableConfig() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
index 87eafb6..cc01ae6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
@@ -182,9 +182,9 @@ public class BasicAuthRealtimeIntegrationTest extends 
BaseClusterIntegrationTest
     // download and sanity-check size of offline segment(s)
     for (int i = 0; i < offlineSegments.size(); i++) {
       String segment = offlineSegments.get(i).asText();
-      Assert.assertTrue(sendGetRequest(_controllerRequestURLBuilder
-          
.forSegmentDownload(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), 
segment), AUTH_HEADER)
-          .length() > 200000); // download segment
+      Assert.assertTrue(
+          
sendGetRequest(_controllerRequestURLBuilder.forSegmentDownload(getTableName(), 
segment), AUTH_HEADER).length()
+              > 200000); // download segment
     }
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
deleted file mode 100644
index 0633d52..0000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.helix.model.IdealState;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-public class UpsertTableSegmentUploadIntegrationTest extends 
BaseClusterIntegrationTestSet {
-  private static final int NUM_BROKERS = 1;
-  private static final int NUM_SERVERS = 2;
-  // Segment 1 contains records of pk value 100000
-  private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
-  // Segment 2 contains records of pk value 100001
-  private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
-  // Segment 3 contains records of pk value 100000
-  private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
-  private static final String PRIMARY_KEY_COL = "clientId";
-  private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME";
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    // Start a customized controller with more frequent realtime segment 
validation
-    startController();
-    startBrokers(getNumBrokers());
-    startServers(NUM_SERVERS);
-
-    // Start Kafka
-    startKafka();
-
-    // Create and upload the schema.
-    Schema schema = createSchema();
-    addSchema(schema);
-
-    // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
-    // Push data to Kafka
-    pushAvroIntoKafka(avroFiles);
-    // Create and upload the table config
-    TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0), 
PRIMARY_KEY_COL, getNumKafkaPartitions());
-    addTableConfig(upsertTableConfig);
-
-    // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 
upsertTableConfig, schema, 0, _segmentDir, _tarDir);
-    uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
-
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  @Override
-  protected String getSchemaFileName() {
-    return "upsert_table_test.schema";
-  }
-
-  @Override
-  protected String getSchemaName() {
-    return "upsertSchema";
-  }
-
-  @Override
-  protected String getAvroTarFileName() {
-    return "upsert_test.tar.gz";
-  }
-
-  @Override
-  protected boolean useLlc() {
-    return true;
-  }
-
-  protected int getNumBrokers() {
-    return NUM_BROKERS;
-  }
-
-  @Override
-  protected long getCountStarResult() {
-    // Three distinct records are expected with pk values of 100000, 100001, 
100002
-    return 3;
-  }
-
-  @Override
-  protected String getPartitionColumn() {
-    return PRIMARY_KEY_COL;
-  }
-
-  @Override
-  protected void startController() {
-    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
-    // Perform realtime segment validation every second with 1 second initial 
delay.
-    controllerConfig
-        
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
 1);
-    
controllerConfig.put(ControllerConf.ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
 1);
-    controllerConfig
-        
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
 1);
-    startController(controllerConfig);
-  }
-
-  @Test
-  public void testSegmentAssignment()
-      throws Exception {
-    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
TABLE_NAME_WITH_TYPE);
-    Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
-    verifyTableIdealStates(idealState);
-    // Wait 3 seconds to let the realtime validation thread to run.
-    Thread.sleep(3000);
-    // Verify the result again.
-    Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
-    verifyTableIdealStates(idealState);
-  }
-
-  private void verifyTableIdealStates(IdealState idealState) {
-    // Verify various ideal state properties
-    Set<String> segments = idealState.getPartitionSet();
-    Assert.assertEquals(segments.size(), 5);
-    Map<String, Integer> segment2PartitionId = new HashMap<>();
-    segment2PartitionId.put(UPLOADED_SEGMENT_1, 0);
-    segment2PartitionId.put(UPLOADED_SEGMENT_2, 1);
-    segment2PartitionId.put(UPLOADED_SEGMENT_3, 1);
-
-    // Verify that all segments of the same partition are mapped to the same 
single server.
-    Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
-    for (String segment : segments) {
-      Integer partitionId;
-      if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
-        partitionId = new LLCSegmentName(segment).getPartitionGroupId();
-      } else {
-        partitionId = segment2PartitionId.get(segment);
-      }
-      Assert.assertNotNull(partitionId);
-      Set<String> instances = idealState.getInstanceSet(segment);
-      Assert.assertEquals(1, instances.size());
-      if (segmentAssignment.containsKey(partitionId)) {
-        Assert.assertEquals(instances, segmentAssignment.get(partitionId));
-      } else {
-        segmentAssignment.put(partitionId, instances);
-      }
-    }
-  }
-
-  private void uploadSegments(String tableName, TableType tableType, File 
tarDir)
-      throws Exception {
-    File[] segmentTarFiles = tarDir.listFiles();
-    assertNotNull(segmentTarFiles);
-    int numSegments = segmentTarFiles.length;
-    assertTrue(numSegments > 0);
-
-    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
-    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
-      if (numSegments == 1) {
-        File segmentTarFile = segmentTarFiles[0];
-        assertEquals(fileUploadDownloadClient
-            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName, tableType)
-            .getStatusCode(), HttpStatus.SC_OK);
-      } else {
-        // Upload all segments in parallel
-        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
-        List<Future<Integer>> futures = new ArrayList<>(numSegments);
-        for (File segmentTarFile : segmentTarFiles) {
-          futures.add(executorService.submit(() -> {
-            return fileUploadDownloadClient
-                .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName, tableType)
-                .getStatusCode();
-          }));
-        }
-        executorService.shutdown();
-        for (Future<Integer> future : futures) {
-          assertEquals((int) future.get(), HttpStatus.SC_OK);
-        }
-      }
-    }
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/resources/upsert_table_test.schema 
b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
deleted file mode 100644
index 3f656f7..0000000
--- a/pinot-integration-tests/src/test/resources/upsert_table_test.schema
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-  "dimensionFieldSpecs": [
-    {
-      "dataType": "INT",
-      "singleValueField": true,
-      "name": "clientId"
-    },
-    {
-      "dataType": "STRING",
-      "singleValueField": true,
-      "name": "city"
-    },
-    {
-      "dataType": "STRING",
-      "singleValueField": true,
-      "name": "description"
-    },
-    {
-      "dataType": "INT",
-      "singleValueField": true,
-      "name": "salary"
-    }
-  ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
-      "timeType": "DAYS",
-      "dataType": "INT",
-      "name": "DaysSinceEpoch"
-    }
-  },
-  "primaryKeyColumns": ["clientId"],
-  "schemaName": "upsertSchema"
-}
diff --git a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz 
b/pinot-integration-tests/src/test/resources/upsert_test.tar.gz
deleted file mode 100644
index 0bd0dc8..0000000
Binary files a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz 
and /dev/null differ
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 8e02f85..9289f9c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -118,8 +118,6 @@ public class PartitionUpsertMetadataManager {
             // timestamp, but the segment has a larger sequence number (the 
segment is newer than the current segment).
             if (recordInfo._timestamp > currentRecordLocation.getTimestamp() 
|| (
                 recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && 
LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
                     && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName
                     
.getSequenceNumber(currentRecordLocation.getSegmentName()))) {
               
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 534a3a0..6a5d74a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -394,12 +394,7 @@ public class CommonConstants {
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        // Means the segment is in CONSUMING state.
-        IN_PROGRESS,
-        // Means the segment is in ONLINE state (segment completed consuming 
and has been saved in segment store).
-        DONE,
-        // Means the segment is uploaded to a Pinot controller by an external 
party.
-        UPLOADED
+        IN_PROGRESS, DONE
       }
 
       /**
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index 5afec65..5588a8c 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -329,9 +329,10 @@ public class PerfBenchmarkDriver {
    *
    * @param segmentMetadata segment metadata.
    */
-  public void addSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata) {
+  public void addSegment(String tableName, SegmentMetadata segmentMetadata) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     _helixResourceManager
-        .addNewSegment(tableNameWithType, segmentMetadata, "http://"; + 
_controllerAddress + "/" + segmentMetadata.getName());
+        .addNewSegment(rawTableName, segmentMetadata, "http://"; + 
_controllerAddress + "/" + segmentMetadata.getName());
   }
 
   public static void waitForExternalViewUpdate(String zkAddress, final String 
clusterName, long timeoutInMilliseconds) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
index 8f2796e..a785a92 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkRunner.java
@@ -62,7 +62,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand 
implements Command
   @Option(name = "-timeoutInSeconds", required = false, metaVar = "<int>", 
usage = "Timeout in seconds for batch load (default 60).")
   private int _timeoutInSeconds = 60;
 
-  @Option(name = "-tableNames", required = false, metaVar = "<String>", usage 
= "Comma separated table names with types to be loaded (non-batch load).")
+  @Option(name = "-tableNames", required = false, metaVar = "<String>", usage 
= "Comma separated table names to be loaded (non-batch load).")
   private String _tableNames;
 
   @Option(name = "-invertedIndexColumns", required = false, metaVar = 
"<String>", usage = "Comma separated inverted index columns to be created 
(non-batch load).")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to