This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new decc4eba2f Properly handle shutdown of TableDataManager (#11380)
decc4eba2f is described below

commit decc4eba2f5f1f4e13ee5f97a62d51518063a7de
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Aug 18 18:39:54 2023 -0700

    Properly handle shutdown of TableDataManager (#11380)
---
 .../helix/core/PinotHelixResourceManager.java      | 170 ++++++---------------
 .../core/data/manager/BaseTableDataManager.java    |  76 ++++++---
 .../core/data/manager/InstanceDataManager.java     |   6 -
 .../manager/offline/DimensionTableDataManager.java |  10 +-
 .../manager/offline/OfflineTableDataManager.java   |   1 +
 .../manager/realtime/RealtimeTableDataManager.java |  29 ++--
 .../tests/BaseClusterIntegrationTest.java          |  31 ++--
 .../tests/BaseClusterIntegrationTestSet.java       |  20 ---
 .../tests/BaseRealtimeClusterIntegrationTest.java  |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |   8 +-
 .../local/data/manager/TableDataManager.java       |   4 +-
 .../starter/helix/HelixInstanceDataManager.java    |  82 +++++-----
 .../SegmentOnlineOfflineStateModelFactory.java     | 125 +++++++++------
 13 files changed, 266 insertions(+), 298 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d584136740..1974d952b5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1984,62 +1984,7 @@ public class PinotHelixResourceManager {
   }
 
   public void deleteOfflineTable(String tableName, @Nullable String 
retentionPeriod) {
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-    LOGGER.info("Deleting table {}: Start", offlineTableName);
-
-    // Remove the table from brokerResource
-    HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed from broker resource", 
offlineTableName);
-
-    // Drop the table on servers
-    // TODO: Make this api idempotent and blocking by waiting for externalview 
to converge on controllers
-    //      instead of servers. This is because if externalview gets updated 
with significant delay,
-    //      we may have the race condition for table recreation that the new 
table will use the old states
-    //      (old table data manager) on the servers.
-    //      Steps needed:
-    //      1. Drop the helix resource first (set idealstate as null)
-    //      2. Wait for the externalview to converge
-    //      3. Get servers for the tenant, and send delete table message to 
these servers
-    deleteTableOnServer(offlineTableName);
-
-    // Drop the table
-    if 
(_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName))
 {
-      _helixAdmin.dropResource(_helixClusterName, offlineTableName);
-      LOGGER.info("Deleting table {}: Removed helix table resource", 
offlineTableName);
-    }
-
-    // Remove all stored segments for the table
-    Long retentionPeriodMs = retentionPeriod != null ? 
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
-    _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, 
getSegmentsFromPropertyStore(offlineTableName),
-        retentionPeriodMs);
-    LOGGER.info("Deleting table {}: Removed stored segments", 
offlineTableName);
-
-    // Remove segment metadata
-    ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed segment metadata", 
offlineTableName);
-
-    // Remove instance partitions
-    InstancePartitionsUtils.removeInstancePartitions(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed instance partitions", 
offlineTableName);
-
-    // Remove tier instance partitions
-    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed tier instance partitions", 
offlineTableName);
-
-    // Remove segment lineage
-    SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed segment lineage", 
offlineTableName);
-
-    // Remove task related metadata
-    MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed all minion task metadata", 
offlineTableName);
-
-    // Remove table config
-    // this should always be the last step for deletion to avoid race 
condition in table re-create.
-    ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, 
offlineTableName);
-    LOGGER.info("Deleting table {}: Removed table config", offlineTableName);
-
-    LOGGER.info("Deleting table {}: Finish", offlineTableName);
+    deleteTable(tableName, TableType.OFFLINE, retentionPeriod);
   }
 
   public void deleteRealtimeTable(String tableName) {
@@ -2047,73 +1992,64 @@ public class PinotHelixResourceManager {
   }
 
   public void deleteRealtimeTable(String tableName, @Nullable String 
retentionPeriod) {
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-    LOGGER.info("Deleting table {}: Start", realtimeTableName);
+    deleteTable(tableName, TableType.REALTIME, retentionPeriod);
+  }
+
+  public void deleteTable(String tableName, TableType tableType, @Nullable 
String retentionPeriod) {
+    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+    LOGGER.info("Deleting table {}: Start", tableNameWithType);
 
     // Remove the table from brokerResource
-    HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, 
realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed from broker resource", 
realtimeTableName);
+    HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed from broker resource", 
tableNameWithType);
 
-    // Drop the table on servers
-    // TODO: Make this api idempotent and blocking by waiting for externalview 
to converge on controllers
-    //      instead of servers. Follow the same steps for offline tables.
-    deleteTableOnServer(realtimeTableName);
+    // Delete the table on servers
+    deleteTableOnServers(tableNameWithType);
 
-    // Cache the state and drop the table
-    Set<String> instancesForTable = null;
-    if 
(_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName))
 {
-      instancesForTable = getAllInstancesForTable(realtimeTableName);
-      _helixAdmin.dropResource(_helixClusterName, realtimeTableName);
-      LOGGER.info("Deleting table {}: Removed helix table resource", 
realtimeTableName);
-    }
+    // Remove ideal state
+    
_helixDataAccessor.removeProperty(_keyBuilder.idealStates(tableNameWithType));
+    LOGGER.info("Deleting table {}: Removed ideal state", tableNameWithType);
 
     // Remove all stored segments for the table
     Long retentionPeriodMs = retentionPeriod != null ? 
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
-    _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, 
getSegmentsFromPropertyStore(realtimeTableName),
+    _segmentDeletionManager.removeSegmentsFromStore(tableNameWithType, 
getSegmentsFromPropertyStore(tableNameWithType),
         retentionPeriodMs);
-    LOGGER.info("Deleting table {}: Removed stored segments", 
realtimeTableName);
+    LOGGER.info("Deleting table {}: Removed stored segments", 
tableNameWithType);
 
     // Remove segment metadata
-    ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, 
realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed segment metadata", 
realtimeTableName);
+    ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed segment metadata", 
tableNameWithType);
 
     // Remove instance partitions
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
-        
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
-    InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
-        
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
-    LOGGER.info("Deleting table {}: Removed instance partitions", 
realtimeTableName);
+    if (tableType == TableType.OFFLINE) {
+      InstancePartitionsUtils.removeInstancePartitions(_propertyStore, 
tableNameWithType);
+    } else {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+          
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+      InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+          
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
+    }
+    LOGGER.info("Deleting table {}: Removed instance partitions", 
tableNameWithType);
 
-    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
rawTableName);
-    LOGGER.info("Deleting table {}: Removed tier instance partitions", 
realtimeTableName);
+    // Remove tier instance partitions
+    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed tier instance partitions", 
tableNameWithType);
 
     // Remove segment lineage
-    SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, 
realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed segment lineage", 
realtimeTableName);
+    SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed segment lineage", 
tableNameWithType);
 
     // Remove task related metadata
-    MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, 
realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed all minion task metadata", 
realtimeTableName);
-
-    // Remove groupId/partitionId mapping for HLC table
-    if (instancesForTable != null) {
-      for (String instance : instancesForTable) {
-        InstanceZKMetadata instanceZKMetadata = 
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, instance);
-        if (instanceZKMetadata != null) {
-          instanceZKMetadata.removeResource(realtimeTableName);
-          ZKMetadataProvider.setInstanceZKMetadata(_propertyStore, 
instanceZKMetadata);
-        }
-      }
-    }
-    LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for 
HLC table", realtimeTableName);
+    MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed all minion task metadata", 
tableNameWithType);
 
     // Remove table config
-    // this should always be the last step for deletion to avoid race 
condition in table re-create.
-    ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, 
realtimeTableName);
-    LOGGER.info("Deleting table {}: Removed table config", realtimeTableName);
+    // NOTE: This should always be the last step for deletion to avoid race 
condition in table re-create
+    ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, 
tableNameWithType);
+    LOGGER.info("Deleting table {}: Removed table config", tableNameWithType);
 
-    LOGGER.info("Deleting table {}: Finish", realtimeTableName);
+    LOGGER.info("Deleting table {}: Finish", tableNameWithType);
   }
 
   /**
@@ -2152,15 +2088,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  private Set<String> getAllInstancesForTable(String tableNameWithType) {
-    Set<String> instanceSet = new HashSet<>();
-    IdealState tableIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-    for (String partition : tableIdealState.getPartitionSet()) {
-      instanceSet.addAll(tableIdealState.getInstanceSet(partition));
-    }
-    return instanceSet;
-  }
-
   /**
    * Returns the ZK metdata for the given jobId and jobType
    * @param jobId the id of the job
@@ -2443,10 +2370,16 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Delete the table on servers by sending table deletion message
+   * Delete the table on servers by sending table deletion messages.
    */
-  private void deleteTableOnServer(String tableNameWithType) {
-    LOGGER.info("Sending delete table message for table: {}", 
tableNameWithType);
+  private void deleteTableOnServers(String tableNameWithType) {
+    // External view can be null for newly created table, skip sending messages
+    if 
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) == 
null) {
+      LOGGER.warn("No delete table message sent for newly created table: {} 
without external view", tableNameWithType);
+      return;
+    }
+
+    LOGGER.info("Sending delete table messages for table: {}", 
tableNameWithType);
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
     recipientCriteria.setInstanceName("%");
@@ -2455,13 +2388,6 @@ public class PinotHelixResourceManager {
     TableDeletionMessage tableDeletionMessage = new 
TableDeletionMessage(tableNameWithType);
     ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
 
-    // Externalview can be null for newly created table, skip sending the 
message
-    if (_helixZkManager.getHelixDataAccessor()
-        
.getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType))
 == null) {
-      LOGGER.warn("No delete table message sent for newly created table: {} as 
the externalview is null.",
-          tableNameWithType);
-      return;
-    }
     // Infinite timeout on the recipient
     int timeoutMs = -1;
     int numMessagesSent = messagingService.send(recipientCriteria, 
tableDeletionMessage, null, timeoutMs);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0187fb6836..0fb1b26dc3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -179,7 +180,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected abstract void doInit();
 
   @Override
-  public void start() {
+  public synchronized void start() {
     _logger.info("Starting table data manager for table: {}", 
_tableNameWithType);
     doStart();
     _logger.info("Started table data manager for table: {}", 
_tableNameWithType);
@@ -188,7 +189,11 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected abstract void doStart();
 
   @Override
-  public void shutDown() {
+  public synchronized void shutDown() {
+    if (_shutDown) {
+      _logger.info("Table data manager for table: {} is already shut down", 
_tableNameWithType);
+      return;
+    }
     _logger.info("Shutting down table data manager for table: {}", 
_tableNameWithType);
     _shutDown = true;
     doShutdown();
@@ -197,6 +202,18 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   protected abstract void doShutdown();
 
+  /**
+   * Releases and removes all segments tracked by the table data manager.
+   */
+  protected void releaseAndRemoveAllSegments() {
+    Iterator<SegmentDataManager> iterator = 
_segmentDataManagerMap.values().iterator();
+    while (iterator.hasNext()) {
+      SegmentDataManager segmentDataManager = iterator.next();
+      iterator.remove();
+      releaseSegment(segmentDataManager);
+    }
+  }
+
   @Override
   public boolean isShutDown() {
     return _shutDown;
@@ -214,6 +231,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public void addSegment(ImmutableSegment immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
+    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
+        segmentName, _tableNameWithType);
     _logger.info("Adding immutable segment: {} to table: {}", segmentName, 
_tableNameWithType);
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
         immutableSegment.getSegmentMetadata().getTotalDocs());
@@ -232,6 +251,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
       throws Exception {
+    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
+        indexDir.getName(), _tableNameWithType);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
     
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
     addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
indexLoadingConfig.getSchema()));
@@ -251,6 +272,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    */
   @Override
   public void removeSegment(String segmentName) {
+    // Allow removing segment after shutdown so that we can remove the segment 
when the table is deleted
+    if (_shutDown) {
+      _logger.info("Table data manager is already shut down, skip removing 
segment: {} from table: {}", segmentName,
+          _tableNameWithType);
+      return;
+    }
     _logger.info("Removing segment: {} from table: {}", segmentName, 
_tableNameWithType);
     SegmentDataManager segmentDataManager = unregisterSegment(segmentName);
     if (segmentDataManager != null) {
@@ -364,6 +391,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
       SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
       throws Exception {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot reload segment: %s of 
table: %s", segmentName,
+        _tableNameWithType);
     String segmentTier = getSegmentCurrentTier(segmentName);
     indexLoadingConfig.setSegmentTier(segmentTier);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
@@ -390,8 +420,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         SegmentDirectory segmentDirectory =
             initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
         // We should first try to reuse existing segment directory
-        if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, 
segmentDirectory, indexLoadingConfig,
-            schema)) {
+        if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, 
segmentDirectory, indexLoadingConfig, schema)) {
           LOGGER.info("Reloading segment: {} of table: {} using existing 
segment directory as no reprocessing needed",
               segmentName, _tableNameWithType);
           // No reprocessing needed, reuse the same segment
@@ -432,9 +461,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
   }
 
-  private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata 
segmentZKMetadata,
-      String currentSegmentTier, SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
-      Schema schema)
+  private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata 
segmentZKMetadata, String currentSegmentTier,
+      SegmentDirectory segmentDirectory, IndexLoadingConfig 
indexLoadingConfig, Schema schema)
       throws Exception {
     SegmentDirectoryLoader segmentDirectoryLoader =
         
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
@@ -446,6 +474,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
       throws Exception {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot add/replace segment: 
%s of table: %s", segmentName,
+        _tableNameWithType);
     if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
       LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
           _tableNameWithType, localMetadata.getCrc());
@@ -595,12 +626,13 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   // not thread safe. Caller should invoke it with safe concurrency control.
-  protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata,
-      File destTarFile) throws Exception {
+  protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata, File destTarFile)
+      throws Exception {
     
Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme()
 != null,
-            "Download peers require non null peer download scheme");
-    List<URI> peerSegmentURIs = 
PeerServerSegmentFinder.getPeerServerURIs(segmentName,
-        _tableDataManagerConfig.getTablePeerDownloadScheme(), _helixManager, 
_tableNameWithType);
+        "Download peers require non null peer download scheme");
+    List<URI> peerSegmentURIs =
+        PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
_tableDataManagerConfig.getTablePeerDownloadScheme(),
+            _helixManager, _tableNameWithType);
     if (peerSegmentURIs.isEmpty()) {
       String msg = String.format("segment %s doesn't have any peers", 
segmentName);
       LOGGER.warn(msg);
@@ -611,10 +643,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       // Next download the segment from a randomly chosen server using 
configured scheme.
       SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(peerSegmentURIs, 
destTarFile, zkMetadata.getCrypterName());
       LOGGER.info("Fetched segment {} from peers: {} to: {} of size: {}", 
segmentName, peerSegmentURIs, destTarFile,
-              destTarFile.length());
+          destTarFile.length());
     } catch (AttemptsExceededException e) {
       LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from peers {} to: {}", segmentName,
-              _tableNameWithType, peerSegmentURIs, destTarFile);
+          _tableNameWithType, peerSegmentURIs, destTarFile);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1L);
       throw e;
     }
@@ -636,12 +668,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     String uri = zkMetadata.getDownloadUrl();
     AtomicInteger attempts = new AtomicInteger(0);
     try {
-        File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri, 
tempRootDir, maxStreamRateInByte, attempts);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
-            attempts.get());
-        LOGGER.info("Downloaded and untarred segment: {} for table: {} from: 
{} attempts: {}", segmentName,
-            _tableNameWithType, uri, attempts.get());
-        return ret;
+      File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri, 
tempRootDir, maxStreamRateInByte, attempts);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+          attempts.get());
+      LOGGER.info("Downloaded and untarred segment: {} for table: {} from: {} 
attempts: {}", segmentName,
+          _tableNameWithType, uri, attempts.get());
+      return ret;
     } catch (Exception e) {
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
           attempts.get());
@@ -771,6 +803,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
+    Preconditions.checkState(!_shutDown,
+        "Table data manager is already shut down, cannot load existing 
segment: %s of table: %s", segmentName,
+        _tableNameWithType);
+
     // Try to recover the segment from potential segment reloading failure.
     String segmentTier = zkMetadata.getTier();
     File indexDir = getSegmentDataDir(segmentName, segmentTier, 
indexLoadingConfig.getTableConfig());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index bbc392d4d0..ffb5923411 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -74,12 +74,6 @@ public interface InstanceDataManager {
   void deleteTable(String tableNameWithType)
       throws Exception;
 
-  /**
-   * Adds a segment from local disk into an OFFLINE table.
-   */
-  void addOfflineSegment(String offlineTableName, String segmentName, File 
indexDir)
-      throws Exception;
-
   /**
    * Adds a segment into an REALTIME table.
    * <p>The segment might be committed or under consuming.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index a9635f2a66..b78874a8e3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -117,8 +117,10 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
 
   @Override
   public void addSegment(ImmutableSegment immutableSegment) {
-    super.addSegment(immutableSegment);
     String segmentName = immutableSegment.getSegmentName();
+    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
+        segmentName, _tableNameWithType);
+    super.addSegment(immutableSegment);
     try {
       if (loadLookupTable()) {
         _logger.info("Successfully loaded lookup table after adding segment: 
{}", segmentName);
@@ -134,6 +136,11 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
 
   @Override
   public void removeSegment(String segmentName) {
+    // Allow removing segment after shutdown so that we can remove the segment 
when the table is deleted
+    if (_shutDown) {
+      _logger.info("Table data manager is already shut down, skip removing 
segment: {}", segmentName);
+      return;
+    }
     super.removeSegment(segmentName);
     try {
       if (loadLookupTable()) {
@@ -150,6 +157,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
 
   @Override
   protected void doShutdown() {
+    releaseAndRemoveAllSegments();
     closeDimensionTable(_dimensionTable.get());
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index 7e17da42cb..aef0d80b9b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -38,5 +38,6 @@ public class OfflineTableDataManager extends 
BaseTableDataManager {
 
   @Override
   protected void doShutdown() {
+    releaseAndRemoveAllSegments();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 3bf7caf24b..ebd5aee4f2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -54,7 +54,6 @@ import 
org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
 import org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -251,18 +250,14 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     if (_tableUpsertMetadataManager != null) {
       // Stop the upsert metadata manager first to prevent removing metadata 
when destroying segments
       _tableUpsertMetadataManager.stop();
-      for (SegmentDataManager segmentDataManager : 
_segmentDataManagerMap.values()) {
-        segmentDataManager.destroy();
-      }
+      releaseAndRemoveAllSegments();
       try {
         _tableUpsertMetadataManager.close();
       } catch (IOException e) {
         _logger.warn("Cannot close upsert metadata manager properly for table: 
{}", _tableNameWithType, e);
       }
     } else {
-      for (SegmentDataManager segmentDataManager : 
_segmentDataManagerMap.values()) {
-        segmentDataManager.destroy();
-      }
+      releaseAndRemoveAllSegments();
     }
     if (_leaseExtender != null) {
       _leaseExtender.shutDown();
@@ -382,6 +377,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @Override
   public void addSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
       throws Exception {
+    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
+        segmentName, _tableNameWithType);
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager != null) {
       _logger.warn("Skipping adding existing segment: {} for table: {} with 
data manager class: {}", segmentName,
@@ -438,8 +435,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     int partitionGroupId = llcSegmentName.getPartitionGroupId();
     Semaphore semaphore = 
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new 
Semaphore(1));
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-        _tableUpsertMetadataManager != null ? 
_tableUpsertMetadataManager.getOrCreatePartitionManager(
-            partitionGroupId) : null;
+        _tableUpsertMetadataManager != null ? 
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+            : null;
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
             : null;
@@ -488,6 +485,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   @Override
   public void addSegment(ImmutableSegment immutableSegment) {
+    String segmentName = immutableSegment.getSegmentName();
+    Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
+        segmentName, _tableNameWithType);
     if (isUpsertEnabled()) {
       handleUpsert(immutableSegment);
       return;
@@ -648,17 +648,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
-  /**
-   * Replaces a committed HLC REALTIME segment.
-   */
-  public void replaceHLSegment(SegmentZKMetadata segmentZKMetadata, 
IndexLoadingConfig indexLoadingConfig)
-      throws Exception {
-    ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
_tableNameWithType, segmentZKMetadata);
-    File indexDir = new File(_indexDir, segmentZKMetadata.getSegmentName());
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
-  }
-
   /**
    * Replaces a committed LLC REALTIME segment.
    */
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index da466e4e93..408558d7e5 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -591,8 +592,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
    */
   protected List<File> unpackTarData(String tarFileName, File outputDir)
       throws Exception {
-    InputStream inputStream =
-        
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
+    InputStream inputStream = 
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
     Assert.assertNotNull(inputStream);
     return TarGzCompressionUtils.untar(inputStream, outputDir);
   }
@@ -618,9 +618,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     String kafkaBroker = "localhost:" + getKafkaPort();
     StreamDataProducer producer = null;
     try {
-      producer =
-          
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
-              getDefaultKafkaProducerProperties(kafkaBroker));
+      producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+          getDefaultKafkaProducerProperties(kafkaBroker));
       ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, 
partitionColumnIndex, injectTombstones(),
           producer);
     } catch (Exception e) {
@@ -635,9 +634,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     String kafkaBroker = "localhost:" + getKafkaPort();
     StreamDataProducer producer = null;
     try {
-      producer =
-          
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
-              getDefaultKafkaProducerProperties(kafkaBroker));
+      producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+          getDefaultKafkaProducerProperties(kafkaBroker));
       ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, 
partitionColumnIndex, injectTombstones(),
           producer);
     } catch (Exception e) {
@@ -734,11 +732,24 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
   protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String 
tableName) {
     final long countStarResult = getCountStarResult();
-    TestUtils.waitForCondition(
-        () -> getCurrentCountStarResult(tableName) == countStarResult, 100L, 
timeoutMs,
+    TestUtils.waitForCondition(() -> getCurrentCountStarResult(tableName) == 
countStarResult, 100L, timeoutMs,
         "Failed to load " + countStarResult + " documents", raiseError, 
Duration.ofMillis(timeoutMs / 10));
   }
 
+  /**
+   * Wait for servers to remove the table data manager after the table is 
deleted.
+   */
+  protected void waitForTableDataManagerRemoved(String tableNameWithType) {
+    TestUtils.waitForCondition(aVoid -> {
+      for (BaseServerStarter serverStarter : _serverStarters) {
+        if 
(serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
 != null) {
+          return false;
+        }
+      }
+      return true;
+    }, 60_000L, "Failed to remove table data manager for table: " + 
tableNameWithType);
+  }
+
   /**
    * Reset table utils.
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index ab79381b05..e850b4ba2b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -36,7 +36,6 @@ import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
-import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -82,25 +81,6 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
     return DEFAULT_NUM_QUERIES_TO_GENERATE;
   }
 
-  /**
-   * Test server table data manager deletion after the table is dropped
-   */
-  protected void cleanupTestTableDataManager(String tableNameWithType) {
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        for (BaseServerStarter serverStarter : _serverStarters) {
-          if 
(serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
-              != null) {
-            return false;
-          }
-        }
-        return true;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to delete table data managers");
-  }
-
   /**
    * Test hard-coded queries.
    * @throws Exception
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index c95d5d7f2c..83adb4dfe9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -180,7 +180,7 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
   public void tearDown()
       throws Exception {
     dropRealtimeTable(getTableName());
-    
cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+    
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
     stopServer();
     stopBroker();
     stopController();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index e1ad0880c6..0debb9d09a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -432,7 +432,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
     waitForNumOfSegmentsBecomeOnline(offlineTableName, 1);
     dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
-    cleanupTestTableDataManager(offlineTableName);
+    waitForTableDataManagerRemoved(offlineTableName);
   }
 
   private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int 
numSegments)
@@ -2386,15 +2386,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     for (int i = 2; i < 20; i++) {
       query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM 
mytable ", i);
       
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 2]);
-      
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
-          expectedResults[i - 2]);
+      
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[i - 2]);
     }
 
     // Default HLL is set as log2m=12
     query = "SELECT distinctCountHLL(FlightNum) FROM mytable ";
     
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[10]);
-    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
-        expectedResults[10]);
+    
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
 expectedResults[10]);
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 45965f4bc4..2de9c586c0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -61,8 +61,8 @@ public interface TableDataManager {
   void start();
 
   /**
-   * Shuts down the table data manager. Should be called only once. After 
calling shut down, no other method should be
-   * called.
+   * Shuts down the table data manager. After calling shut down, no other 
method should be called.
+   * NOTE: Shut down might be called multiple times. The implementation should 
be able to handle that.
    */
   void shutDown();
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index bdc4baf58d..d5c0c8afd8 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
@@ -42,7 +41,9 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -213,26 +214,6 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     LOGGER.info("Helix instance data manager shut down");
   }
 
-  @Override
-  public void addOfflineSegment(String offlineTableName, String segmentName, 
File indexDir)
-      throws Exception {
-    LOGGER.info("Adding segment: {} to table: {}", segmentName, 
offlineTableName);
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", offlineTableName);
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
-    SegmentZKMetadata zkMetadata =
-        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
offlineTableName, segmentName);
-    Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for offline segment: %s, table: %s",
-        segmentName, offlineTableName);
-
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
-    indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
-
-    _tableDataManagerMap.computeIfAbsent(offlineTableName, k -> 
createTableDataManager(k, tableConfig))
-        .addSegment(indexDir, indexLoadingConfig);
-    LOGGER.info("Added segment: {} to table: {}", segmentName, 
offlineTableName);
-  }
-
   @Override
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
       throws Exception {
@@ -264,28 +245,43 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   @Override
   public void deleteTable(String tableNameWithType)
       throws Exception {
-    // Wait externalview to converge
-    long endTimeMs = System.currentTimeMillis() + 
_externalViewDroppedMaxWaitMs;
-    do {
-      ExternalView externalView = _helixManager.getHelixDataAccessor()
-          
.getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType));
-      if (externalView == null) {
-        LOGGER.info("ExternalView converged for the table to delete: {}", 
tableNameWithType);
-        _tableDataManagerMap.compute(tableNameWithType, (k, v) -> {
-          if (v != null) {
-            v.shutDown();
-            LOGGER.info("Removed table: {}", tableNameWithType);
-          } else {
-            LOGGER.warn("Failed to find table data manager for table: {}, skip 
removing the table", tableNameWithType);
-          }
-          return null;
-        });
-        return;
-      }
-      Thread.sleep(_externalViewDroppedCheckInternalMs);
-    } while (System.currentTimeMillis() < endTimeMs);
-    throw new TimeoutException(
-        "Timeout while waiting for ExternalView to converge for the table to 
delete: " + tableNameWithType);
+    TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
+    if (tableDataManager == null) {
+      LOGGER.warn("Failed to find table data manager for table: {}, skip 
deleting the table", tableNameWithType);
+      return;
+    }
+    LOGGER.info("Shutting down table data manager for table: {}", 
tableNameWithType);
+    tableDataManager.shutDown();
+    LOGGER.info("Finished shutting down table data manager for table: {}", 
tableNameWithType);
+
+    try {
+      // Wait for external view to disappear or become empty before removing 
the table data manager.
+      //
+      // When creating the table, controller will check whether the external 
view exists, and allow table creation only
+      // if it doesn't exist. If the table is recreated just after external 
view disappeared, there is a small chance
+      // that server won't realize the external view is removed because it is 
recreated before the server checks it. In
+      // order to handle this scenario, we want to remove the table data 
manager when the external view exists but is
+      // empty.
+      HelixDataAccessor helixDataAccessor = 
_helixManager.getHelixDataAccessor();
+      PropertyKey externalViewKey = 
helixDataAccessor.keyBuilder().externalView(tableNameWithType);
+      long endTimeMs = System.currentTimeMillis() + 
_externalViewDroppedMaxWaitMs;
+      do {
+        ExternalView externalView = 
helixDataAccessor.getProperty(externalViewKey);
+        if (externalView == null) {
+          LOGGER.info("ExternalView is dropped for table: {}", 
tableNameWithType);
+          return;
+        }
+        if (externalView.getRecord().getMapFields().isEmpty()) {
+          LOGGER.info("ExternalView is empty for table: {}", 
tableNameWithType);
+          return;
+        }
+        Thread.sleep(_externalViewDroppedCheckInternalMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+      LOGGER.warn("ExternalView still exists after {}ms for table: {}", 
_externalViewDroppedMaxWaitMs,
+          tableNameWithType);
+    } finally {
+      _tableDataManagerMap.remove(tableNameWithType);
+    }
   }
 
   @Override
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index b85d13b953..510f4fa74d 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -29,8 +29,6 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -75,49 +73,61 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
 
     @Transition(from = "OFFLINE", to = "CONSUMING")
     public void onBecomeConsumingFromOffline(Message message, 
NotificationContext context) {
-      
Preconditions.checkState(SegmentName.isLowLevelConsumerSegmentName(message.getPartitionName()),
-          "Tried to go into CONSUMING state on non-low level segment");
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " 
+ message);
-      // We do the same processing as usual for going to the consuming state, 
which adds the segment to the table data
-      // manager and starts Kafka consumption
-      onBecomeOnlineFromOffline(message, context);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.addRealtimeSegment(realtimeTableName, 
segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> 
CONSUMING for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, 
e));
+        }
+        Utils.rethrowException(e);
+      }
     }
 
     @Transition(from = "CONSUMING", to = "ONLINE")
     public void onBecomeOnlineFromConsuming(Message message, 
NotificationContext context) {
+      
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " 
+ message);
       String realtimeTableName = message.getResourceName();
-      String segmentNameStr = message.getPartitionName();
-      LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-
+      String segmentName = message.getPartitionName();
       TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
-      Preconditions.checkNotNull(tableDataManager);
-      tableDataManager.onConsumingToOnline(segmentNameStr);
-      SegmentDataManager acquiredSegment = 
tableDataManager.acquireSegment(segmentNameStr);
+      Preconditions.checkState(tableDataManager != null, "Failed to find 
table: %s", realtimeTableName);
+      tableDataManager.onConsumingToOnline(segmentName);
+      SegmentDataManager acquiredSegment = 
tableDataManager.acquireSegment(segmentName);
       // For this transition to be correct in helix, we should already have a 
segment that is consuming
-      if (acquiredSegment == null) {
-        throw new RuntimeException("Segment " + segmentNameStr + " + not 
present ");
-      }
+      Preconditions.checkState(acquiredSegment != null, "Failed to find 
segment: %s in table: %s", segmentName,
+          realtimeTableName);
 
       // TODO: https://github.com/apache/pinot/issues/10049
       try {
         if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) {
           // We found an LLC segment that is not consuming right now, must be 
that we already swapped it with a
           // segment that has been built. Nothing to do for this state 
transition.
-          _logger
-              .info("Segment {} not an instance of 
LLRealtimeSegmentDataManager. Reporting success for the transition",
-                  acquiredSegment.getSegmentName());
+          _logger.info(
+              "Segment {} not an instance of LLRealtimeSegmentDataManager. 
Reporting success for the transition",
+              acquiredSegment.getSegmentName());
           return;
         }
         LLRealtimeSegmentDataManager segmentDataManager = 
(LLRealtimeSegmentDataManager) acquiredSegment;
-        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
-            .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), 
realtimeTableName, segmentNameStr);
+        SegmentZKMetadata segmentZKMetadata =
+            
ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
 realtimeTableName,
+                segmentName);
         segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
-      } catch (InterruptedException e) {
-        String errorMessage = String.format("State transition interrupted for 
segment %s.", segmentNameStr);
-        _logger.warn(errorMessage, e);
-        tableDataManager
-            .addSegmentError(segmentNameStr, new 
SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
-        throw new RuntimeException(e);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition CONSUMING -> 
ONLINE for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        tableDataManager.addSegmentError(segmentName,
+            new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+        Utils.rethrowException(e);
       } finally {
         tableDataManager.releaseSegment(acquiredSegment);
       }
@@ -131,7 +141,7 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       try {
         _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
       } catch (Exception e) {
-        _logger.error("Caught exception in state transition from CONSUMING -> 
OFFLINE for resource: {}, partition: {}",
+        _logger.error("Caught exception in state transition CONSUMING -> 
OFFLINE for table: {}, segment: {}",
             realtimeTableName, segmentName, e);
         Utils.rethrowException(e);
       }
@@ -141,15 +151,16 @@ public class SegmentOnlineOfflineStateModelFactory 
extends StateModelFactory<Sta
     public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context) {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " 
+ message);
       String realtimeTableName = message.getResourceName();
-      String segmentNameStr = message.getPartitionName();
+      String segmentName = message.getPartitionName();
       TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
-      Preconditions.checkNotNull(tableDataManager);
-      tableDataManager.onConsumingToDropped(segmentNameStr);
+      Preconditions.checkState(tableDataManager != null, "Failed to find 
table: %s", realtimeTableName);
+      tableDataManager.onConsumingToDropped(segmentName);
       try {
-        onBecomeOfflineFromConsuming(message, context);
-        onBecomeDroppedFromOffline(message, context);
-      } catch (final Exception e) {
-        _logger.error("Caught exception on CONSUMING -> DROPPED state 
transition", e);
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> 
DROPPED for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
         Utils.rethrowException(e);
       }
     }
@@ -160,7 +171,7 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       String tableNameWithType = message.getResourceName();
       String segmentName = message.getPartitionName();
       try {
-        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(message.getResourceName());
+        TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
         Preconditions.checkNotNull(tableType);
         if (tableType == TableType.OFFLINE) {
           _instanceDataManager.addOrReplaceSegment(tableNameWithType, 
segmentName);
@@ -168,14 +179,14 @@ public class SegmentOnlineOfflineStateModelFactory 
extends StateModelFactory<Sta
           _instanceDataManager.addRealtimeSegment(tableNameWithType, 
segmentName);
         }
       } catch (Exception e) {
-        String errorMessage = String
-            .format("Caught exception in state transition from OFFLINE -> 
ONLINE for resource: %s, partition: %s",
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> 
ONLINE for table: %s, segment: %s",
                 tableNameWithType, segmentName);
         _logger.error(errorMessage, e);
         TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
         if (tableDataManager != null) {
-          tableDataManager
-              .addSegmentError(segmentName, new 
SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, 
e));
         }
         Utils.rethrowException(e);
       }
@@ -191,7 +202,7 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       try {
         _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
       } catch (Exception e) {
-        _logger.error("Caught exception in state transition from ONLINE -> 
OFFLINE for resource: {}, partition: {}",
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE 
for table: {}, segment: {}",
             tableNameWithType, segmentName, e);
         Utils.rethrowException(e);
       }
@@ -205,8 +216,9 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       String segmentName = message.getPartitionName();
       try {
         _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
-      } catch (final Exception e) {
-        _logger.error("Cannot drop the segment : " + segmentName + " from 
server!\n" + e.getMessage(), e);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
         Utils.rethrowException(e);
       }
     }
@@ -214,18 +226,35 @@ public class SegmentOnlineOfflineStateModelFactory 
extends StateModelFactory<Sta
     @Transition(from = "ONLINE", to = "DROPPED")
     public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() 
: " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
       try {
-        onBecomeOfflineFromOnline(message, context);
-        onBecomeDroppedFromOffline(message, context);
-      } catch (final Exception e) {
-        _logger.error("Caught exception on ONLINE -> DROPPED state 
transition", e);
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
         Utils.rethrowException(e);
       }
     }
 
     @Transition(from = "ERROR", to = "OFFLINE")
     public void onBecomeOfflineFromError(Message message, NotificationContext 
context) {
-      _logger.info("Resetting the state for segment:{} from ERROR to OFFLINE", 
message.getPartitionName());
+      _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromError() 
: " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext 
context) {
+      _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() 
: " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to