This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f3dc5  Move segment download and segment reload into TableManager 
(#7319)
a5f3dc5 is described below

commit a5f3dc507e6441baca35dae5bbfad122356683b6
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Wed Sep 15 09:38:14 2021 -0700

    Move segment download and segment reload into TableManager (#7319)
    
    Deletes SegmentFetcherAndLoader; and move its functionality to 
BaseTableManager, preparing to combine the download logic with 
RealtimeTableManager. Besides, adding UTs.
---
 .../core/data/manager/BaseTableDataManager.java    | 230 +++++++
 .../core/data/manager/InstanceDataManager.java     |  21 +-
 .../manager/realtime/RealtimeTableDataManager.java |  25 +-
 ...=> BaseTableDataManagerAcquireSegmentTest.java} |  37 +-
 .../data/manager/BaseTableDataManagerTest.java     | 664 ++++++++++-----------
 .../realtime/RealtimeTableDataManagerTest.java     |  56 ++
 .../local/data/manager/TableDataManager.java       |  24 +
 .../server/starter/helix/BaseServerStarter.java    |  28 +-
 .../starter/helix/HelixInstanceDataManager.java    |  96 ++-
 .../starter/helix/SegmentFetcherAndLoader.java     | 257 --------
 .../helix/SegmentMessageHandlerFactory.java        |  20 +-
 .../SegmentOnlineOfflineStateModelFactory.java     |   9 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   6 +-
 13 files changed, 751 insertions(+), 722 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 2933aa8..99bb521 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -18,32 +18,46 @@
  */
 package org.apache.pinot.core.data.manager;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.LoadingCache;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.Pair;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -256,4 +270,220 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
           .collect(Collectors.toMap(map -> map.getKey().getSecond(), 
Map.Entry::getValue));
     }
   }
+
+  @Override
+  public void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+      SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
+      throws Exception {
+    File indexDir = localMetadata.getIndexDir();
+    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is 
not a directory", indexDir);
+
+    File parentFile = indexDir.getParentFile();
+    File segmentBackupDir =
+        new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+    try {
+      // First rename index directory to segment backup directory so that 
original segment have all file descriptors
+      // point to the segment backup directory to ensure original segment 
serves queries properly
+
+      // Rename index directory to segment backup directory (atomic)
+      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+          "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
+
+      // Download from remote or copy from local backup directory into index 
directory,
+      // and then continue to load the segment from index directory.
+      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, 
localMetadata);
+      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+        if (forceDownload) {
+          LOGGER.info("Segment: {} of table: {} is forced to download", 
segmentName, _tableNameWithType);
+        } else {
+          LOGGER.info("Download segment:{} of table: {} as local crc: {} 
mismatches remote crc: {}", segmentName,
+              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+        }
+        indexDir = downloadSegment(segmentName, zkMetadata);
+      } else {
+        LOGGER.info("Reload the local copy of segment: {} of table: {}", 
segmentName, _tableNameWithType);
+        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+      }
+
+      // Load from index directory and replace the old segment in memory.
+      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
+
+      // Rename segment backup directory to segment temporary directory 
(atomic)
+      // The reason to first rename then delete is that, renaming is an atomic 
operation, but deleting is not. When we
+      // rename the segment backup directory to segment temporary directory, 
we know the reload already succeeded, so
+      // that we can safely delete the segment temporary directory
+      File segmentTempDir = new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+          "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
+          segmentTempDir);
+      FileUtils.deleteDirectory(segmentTempDir);
+    } catch (Exception reloadFailureException) {
+      try {
+        LoaderUtils.reloadFailureRecovery(indexDir);
+      } catch (Exception recoveryFailureException) {
+        LOGGER.error("Failed to recover after reload failure", 
recoveryFailureException);
+        reloadFailureException.addSuppressed(recoveryFailureException);
+      }
+      throw reloadFailureException;
+    }
+  }
+
+  @Override
+  public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      throws Exception {
+    if (!isNewSegment(zkMetadata, localMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return;
+    }
+
+    // Try to recover if no local metadata is provided.
+    if (localMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
+      localMetadata = recoverSegmentQuietly(segmentName);
+      if (!isNewSegment(zkMetadata, localMetadata)) {
+        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
+            localMetadata.getCrc());
+        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+          return;
+        }
+        // Set local metadata to null to indicate that the local segment fails 
to load,
+        // although it exists and has same crc with the remote one.
+        localMetadata = null;
+      }
+    }
+
+    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: 
%s of table: %s does not allow download",
+        segmentName, _tableNameWithType);
+
+    // Download segment and replace the local one, either due to failure to 
recover local segment,
+    // or the segment data is updated and has new CRC now.
+    if (localMetadata == null) {
+      LOGGER.info("Download segment: {} of table: {} as no good one exists 
locally", segmentName, _tableNameWithType);
+    } else {
+      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
+          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    }
+    File indexDir = downloadSegment(segmentName, zkMetadata);
+    addSegment(indexDir, indexLoadingConfig);
+    LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", 
segmentName, _tableNameWithType,
+        zkMetadata.getCrc());
+  }
+
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
+    return true;
+  }
+
+  protected File downloadSegment(String segmentName, SegmentZKMetadata 
zkMetadata)
+      throws Exception {
+    // TODO: may support download from peer servers for RealTime table.
+    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+  }
+
+  /**
+   * Server restart during segment reload might leave segment directory in 
inconsistent state, like the index
+   * directory might not exist but segment backup directory existed. This 
method tries to recover from reload
+   * failure before checking the existence of the index directory and loading 
segment metadata from it.
+   */
+  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+      if (!indexDir.exists()) {
+        LOGGER.info("Segment: {} of table: {} is not found on disk", 
segmentName, _tableNameWithType);
+        return null;
+      }
+      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+      LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready 
for loading", segmentName,
+          _tableNameWithType, localMetadata.getCrc());
+      return localMetadata;
+    } catch (Exception e) {
+      LOGGER.error("Failed to recover segment: {} of table: {} from disk", 
segmentName, _tableNameWithType, e);
+      FileUtils.deleteQuietly(indexDir);
+      return null;
+    }
+  }
+
+  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
+    File indexDir = getSegmentDataDir(segmentName);
+    try {
+      addSegment(indexDir, indexLoadingConfig);
+      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, 
_tableNameWithType);
+      return true;
+    } catch (Exception e) {
+      FileUtils.deleteQuietly(indexDir);
+      LOGGER.error("Failed to load segment: {} of table: {} from disk", 
segmentName, _tableNameWithType, e);
+      return false;
+    }
+  }
+
+  private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
+      throws Exception {
+    File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
+    FileUtils.forceMkdir(tempRootDir);
+    try {
+      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+    } finally {
+      FileUtils.deleteQuietly(tempRootDir);
+    }
+  }
+
+  @VisibleForTesting
+  File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, 
File tempRootDir)
+      throws Exception {
+    File tarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    String uri = zkMetadata.getDownloadUrl();
+    try {
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, 
zkMetadata.getCrypterName());
+      LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
+          _tableNameWithType, uri, tarFile, tarFile.length());
+      return tarFile;
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from: {} to: {}", segmentName,
+          _tableNameWithType, uri, tarFile);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir)
+      throws IOException {
+    File untarDir = new File(tempRootDir, segmentName);
+    try {
+      // If an exception is thrown when untarring, it means the tar file is 
broken
+      // or not found after the retry. Thus, there's no need to retry again.
+      File untaredSegDir = TarGzCompressionUtils.untar(tarFile, 
untarDir).get(0);
+      LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile, 
untarDir);
+      // Replace the existing index directory.
+      File indexDir = getSegmentDataDir(segmentName);
+      FileUtils.deleteDirectory(indexDir);
+      FileUtils.moveDirectory(untaredSegDir, indexDir);
+      LOGGER.info("Successfully downloaded segment: {} of table: {} to index 
dir: {}", segmentName, _tableNameWithType,
+          indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}", 
segmentName, _tableNameWithType, tarFile,
+          untarDir);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.UNTAR_FAILURES, 1L);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  File getSegmentDataDir(String segmentName) {
+    return new File(_indexDir, segmentName);
+  }
+
+  @VisibleForTesting
+  static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable 
SegmentMetadata localMetadata) {
+    return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+  }
+
+  private static boolean hasSameCRC(SegmentZKMetadata zkMetadata, 
SegmentMetadata localMetadata) {
+    return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 4cb2da2..66074a8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -80,15 +80,26 @@ public interface InstanceDataManager {
       throws Exception;
 
   /**
-   * Reloads a segment in a table.
+   * Reloads a segment in a table. This method can download a new segment to 
replace the local
+   * one before loading. Download happens when local segment's CRC mismatches 
the one of
+   * the remote segment; but can also be forced to do regardless of CRC.
    */
-  void reloadSegment(String tableNameWithType, String segmentName)
+  void reloadSegment(String tableNameWithType, String segmentName, boolean 
forceDownload)
       throws Exception;
 
   /**
-   * Reloads all segment in a table.
+   * Reloads all segments in a table.
    */
-  void reloadAllSegments(String tableNameWithType)
+  void reloadAllSegments(String tableNameWithType, boolean forceDownload)
+      throws Exception;
+
+  /**
+   * Adds or replaces a segment in a table. Different from segment reloading, 
this method
+   * doesn't assume the existence of TableDataManager object and it can 
actually initialize
+   * the TableDataManager for the segment. A new segment is downloaded if the 
local one is
+   * not working or has a different CRC from the remote one.
+   */
+  void addOrReplaceSegment(String tableNameWithType, String segmentName)
       throws Exception;
 
   /**
@@ -116,7 +127,7 @@ public interface InstanceDataManager {
   /**
    * Returns the directory for un-tarred segment data.
    */
-  String getSegmentDataDirectory();
+  File getSegmentDataDirectory(String tableNameWithType, String segmentName);
 
   /**
    * Returns the directory for tarred segment files.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 931498a..f8d57ff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -251,15 +251,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    * This call comes in one of two ways:
    * For HL Segments:
    * - We are being directed by helix to own up all the segments that we 
committed and are still in retention. In
-   * this case
-   *   we treat it exactly like how OfflineTableDataManager would -- wrap it 
into an OfflineSegmentDataManager, and
-   * put it
-   *   in the map.
+   * this case we treat it exactly like how OfflineTableDataManager would -- 
wrap it into an
+   * OfflineSegmentDataManager, and put it in the map.
    * - We are being asked to own up a new realtime segment. In this case, we 
wrap the segment with a
-   * RealTimeSegmentDataManager
-   *   (that kicks off consumption). When the segment is committed we get 
notified via the notifySegmentCommitted
-   * call, at
-   *   which time we replace the segment with the OfflineSegmentDataManager
+   * RealTimeSegmentDataManager (that kicks off consumption). When the segment 
is committed we get notified via the
+   * notifySegmentCommitted call, at which time we replace the segment with 
the OfflineSegmentDataManager
+   *
    * For LL Segments:
    * - We are being asked to start consuming from a partition.
    * - We did not know about the segment and are being asked to download and 
own the segment (re-balancing, or
@@ -412,7 +409,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     partitionUpsertMetadataManager.addSegment(immutableSegment, 
recordInfoIterator);
   }
 
-  public void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
+  @Override
+  protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
+    // Only LLC immutable segment allows download.
+    if (SegmentName.isHighLevelConsumerSegmentName(segmentName) || 
zkMetadata.getStatus() == Status.IN_PROGRESS) {
+      return false;
+    }
+    // TODO: may support download from peer servers as well.
+    return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
+  }
+
+  void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata 
segmentZKMetadata,
       IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
     String uri = segmentZKMetadata.getDownloadUrl();
     if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
similarity index 94%
copy from 
pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
copy to 
pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 32edb2e..a34b569 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -49,14 +49,13 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.*;
 
 
-public class BaseTableDataManagerTest {
-
-  private static final Random RANDOM = new Random();
+public class BaseTableDataManagerAcquireSegmentTest {
   private static final String TABLE_NAME = "testTable";
   private static final String SEGMENT_PREFIX = "segment";
 
   // Set once for the suite
   private File _tmpDir;
+  private Random _random;
 
   // Set once for every test
   private volatile int _nDestroys;
@@ -82,6 +81,10 @@ public class BaseTableDataManagerTest {
       throws Exception {
     _tmpDir = File.createTempFile("OfflineTableDataManagerTest", null);
     _tmpDir.deleteOnExit();
+
+    long seed = System.currentTimeMillis();
+    _random = new Random(seed);
+    System.out.printf("Record random seed: %d to reproduce test results upon 
failure\n", seed);
   }
 
   @AfterSuite
@@ -229,18 +232,15 @@ public class BaseTableDataManagerTest {
     // With the current parameters, 3k ops take about 15 seconds, create about 
90 segments and drop about half of them
     // Running with coverage, it provides complete coverage of the (relevant) 
lines in OfflineTableDataManager
 
-    Random random = new Random();
     TableDataManager tableDataManager = makeTestableManager();
 
     for (int i = _lo; i <= _hi; i++) {
       final String segName = SEGMENT_PREFIX + i;
-      tableDataManager.addSegment(makeImmutableSegment(segName, 
random.nextInt()));
+      tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
       _allSegManagers.add(_internalSegMap.get(segName));
     }
 
     runStorageServer(numQueryThreads, runTimeSec, tableDataManager);  // 
replaces segments while online
-
-//    System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + 
",nCreates=" + _allSegments.size());
     tableDataManager.shutDown();
   }
 
@@ -249,14 +249,15 @@ public class BaseTableDataManagerTest {
     // Start 1 helix worker thread and as many query threads as configured.
     List<Thread> queryThreads = new ArrayList<>(numQueryThreads);
     for (int i = 0; i < numQueryThreads; i++) {
-      BaseTableDataManagerTest.TestSegmentUser segUser = new 
BaseTableDataManagerTest.TestSegmentUser(tableDataManager);
+      BaseTableDataManagerAcquireSegmentTest.TestSegmentUser segUser =
+          new 
BaseTableDataManagerAcquireSegmentTest.TestSegmentUser(tableDataManager);
       Thread segUserThread = new Thread(segUser);
       queryThreads.add(segUserThread);
       segUserThread.start();
     }
 
-    BaseTableDataManagerTest.TestHelixWorker helixWorker =
-        new BaseTableDataManagerTest.TestHelixWorker(tableDataManager);
+    BaseTableDataManagerAcquireSegmentTest.TestHelixWorker helixWorker =
+        new 
BaseTableDataManagerAcquireSegmentTest.TestHelixWorker(tableDataManager);
     Thread helixWorkerThread = new Thread(helixWorker);
     helixWorkerThread.start();
     _masterThread = Thread.currentThread();
@@ -325,7 +326,7 @@ public class BaseTableDataManagerTest {
       while (!_closing) {
         try {
           List<SegmentDataManager> segmentDataManagers = null;
-          double probability = RANDOM.nextDouble();
+          double probability = _random.nextDouble();
           if (probability <= ACQUIRE_ALL_PROBABILITY) {
             segmentDataManagers = _tableDataManager.acquireAllSegments();
           } else {
@@ -344,7 +345,7 @@ public class BaseTableDataManagerTest {
           }
           // To simulate real use case, may be we can add a small percent that 
is returned right away after pruning?
           try {
-            int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) 
+ _minUseTimeMs;
+            int sleepTime = _random.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) 
+ _minUseTimeMs;
             Thread.sleep(sleepTime);
           } catch (InterruptedException e) {
             _closing = true;
@@ -366,7 +367,7 @@ public class BaseTableDataManagerTest {
       Set<Integer> segmentIds = new HashSet<>(totalSegs);
       final int nSegments = totalSegs * _nSegsPercent / 100;
       while (segmentIds.size() != nSegments) {
-        segmentIds.add(RANDOM.nextInt(totalSegs) + lo);
+        segmentIds.add(_random.nextInt(totalSegs) + lo);
       }
       return segmentIds;
     }
@@ -394,7 +395,7 @@ public class BaseTableDataManagerTest {
     public void run() {
       while (!_closing) {
         try {
-          int nextInt = RANDOM.nextInt(100);
+          int nextInt = _random.nextInt(100);
           if (nextInt < _removePercent) {
             removeSegment();
           } else if (nextInt < _removePercent + _replacePercent) {
@@ -403,7 +404,7 @@ public class BaseTableDataManagerTest {
             addSegment();
           }
           try {
-            int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) + 
_minSleepMs;
+            int sleepTime = _random.nextInt(_maxSleepMs - _minSleepMs + 1) + 
_minSleepMs;
             Thread.sleep(sleepTime);
           } catch (InterruptedException e) {
             _closing = true;
@@ -419,16 +420,16 @@ public class BaseTableDataManagerTest {
     private void addSegment() {
       final int segmentToAdd = _hi + 1;
       final String segName = SEGMENT_PREFIX + segmentToAdd;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
RANDOM.nextInt()));
+      _tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
       _allSegManagers.add(_internalSegMap.get(segName));
       _hi = segmentToAdd;
     }
 
     // Replace a segment between _lo and _hi
     private void replaceSegment() {
-      int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo;
+      int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
       final String segName = SEGMENT_PREFIX + segToReplace;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
RANDOM.nextInt()));
+      _tableDataManager.addSegment(makeImmutableSegment(segName, 
_random.nextInt()));
       _allSegManagers.add(_internalSegMap.get(segName));
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 32edb2e..1a5a3d5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -19,428 +19,382 @@
 package org.apache.pinot.core.data.manager;
 
 import java.io.File;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.spi.crypt.PinotCrypter;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.*;
+import static 
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY;
+import static 
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY;
+import static 
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 public class BaseTableDataManagerTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"OfflineTableDataManagerTest");
 
-  private static final Random RANDOM = new Random();
-  private static final String TABLE_NAME = "testTable";
-  private static final String SEGMENT_PREFIX = "segment";
-
-  // Set once for the suite
-  private File _tmpDir;
-
-  // Set once for every test
-  private volatile int _nDestroys;
-  private volatile boolean _closing;
-  private Set<ImmutableSegment> _allSegments = new HashSet<>();
-  private Set<SegmentDataManager> _accessedSegManagers =
-      Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager, 
Boolean>());
-  private Set<SegmentDataManager> _allSegManagers =
-      Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager, 
Boolean>());
-  private AtomicInteger _numQueries = new AtomicInteger(0);
-  private Map<String, ImmutableSegmentDataManager> _internalSegMap;
-  private Throwable _exception;
-  private Thread _masterThread;
-  // Segment numbers in place.
-  // When we add a segment, we add hi+1, and bump _hi.
-  // When we remove a segment, we remove _lo and bump _lo
-  // When we replace a segment, we pick a number between _hi and _lo 
(inclusive)
-  private volatile int _lo;
-  private volatile int _hi;
-
-  @BeforeSuite
+  private static final String TABLE_NAME = "__table01__";
+
+  @BeforeMethod
   public void setUp()
       throws Exception {
-    _tmpDir = File.createTempFile("OfflineTableDataManagerTest", null);
-    _tmpDir.deleteOnExit();
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+    initSegmentFetcher();
   }
 
-  @AfterSuite
-  public void tearDown() {
-    if (_tmpDir != null) {
-      org.apache.commons.io.FileUtils.deleteQuietly(_tmpDir);
-    }
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
   }
 
-  @BeforeMethod
-  public void beforeMethod() {
-    _nDestroys = 0;
-    _closing = false;
-    _allSegments.clear();
-    _accessedSegManagers.clear();
-    _allSegManagers.clear();
-    _numQueries.set(0);
-    _exception = null;
-    _masterThread = null;
-  }
+  private BaseTableDataManager makeTestableManager() {
+    TableDataManagerConfig config = mock(TableDataManagerConfig.class);
+    when(config.getTableName()).thenReturn(TABLE_NAME);
+    when(config.getDataDir()).thenReturn(new File(TEMP_DIR, 
TABLE_NAME).getAbsolutePath());
 
-  private TableDataManager makeTestableManager()
-      throws Exception {
-    TableDataManager tableDataManager = new OfflineTableDataManager();
-    TableDataManagerConfig config;
-    {
-      config = mock(TableDataManagerConfig.class);
-      when(config.getTableName()).thenReturn(TABLE_NAME);
-      when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
-    }
+    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
         new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null);
     tableDataManager.start();
-    Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
-    segsMapField.setAccessible(true);
-    _internalSegMap = (Map<String, ImmutableSegmentDataManager>) 
segsMapField.get(tableDataManager);
     return tableDataManager;
   }
 
-  private ImmutableSegment makeImmutableSegment(String segmentName, int 
totalDocs) {
-    ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
-    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
-    when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
-    when(immutableSegment.getSegmentName()).thenReturn(segmentName);
-    
when(immutableSegment.getSegmentMetadata().getTotalDocs()).thenReturn(totalDocs);
-    doAnswer(invocation -> {
-      _nDestroys++;
-      return null;
-    }).when(immutableSegment).destroy();
-    _allSegments.add(immutableSegment);
-    return immutableSegment;
+  @Test
+  public void testReloadSegmentNewData()
+      throws Exception {
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-new-data");
+
+    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils
+        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remove");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+    File indexDir = tmgr.getSegmentDataDir("seg01");
+    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+    // Different CRCs leading to segment download.
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("10240");
+    when(llmd.getIndexDir()).thenReturn(indexDir);
+
+    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, false);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("k=remove"));
   }
 
   @Test
-  public void basicTest()
+  public void testReloadSegmentLocalCopy()
       throws Exception {
-    TableDataManager tableDataManager = makeTestableManager();
-    Assert.assertEquals(tableDataManager.getNumSegments(), 0);
-    final String segmentName = "TestSegment";
-    final int totalDocs = 23456;
-    // Add the segment, get it for use, remove the segment, and then return it.
-    // Make sure that the segment is not destroyed before return.
-    ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, 
totalDocs);
-    tableDataManager.addSegment(immutableSegment);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 1);
-    SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
-    Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
-    tableDataManager.removeSegment(segmentName);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 0);
-    Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
-    Assert.assertEquals(_nDestroys, 0);
-    tableDataManager.releaseSegment(segmentDataManager);
-    Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
-    Assert.assertEquals(_nDestroys, 1);
-
-    // Now the segment should not be available for use.Also, returning a null 
reader is fine
-    segmentDataManager = tableDataManager.acquireSegment(segmentName);
-    Assert.assertNull(segmentDataManager);
-    List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireAllSegments();
-    Assert.assertEquals(segmentDataManagers.size(), 0);
-
-    // Removing the segment again is fine.
-    tableDataManager.removeSegment(segmentName);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 0);
-
-    // Add a new segment and remove it in order this time.
-    final String anotherSeg = "AnotherSegment";
-    ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
-    tableDataManager.addSegment(ix1);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 1);
-    SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
-    Assert.assertNotNull(sdm1);
-    Assert.assertEquals(sdm1.getReferenceCount(), 2);
-    // acquire all segments
-    List<SegmentDataManager> segmentDataManagersList = 
tableDataManager.acquireAllSegments();
-    Assert.assertEquals(segmentDataManagersList.size(), 1);
-    Assert.assertEquals(sdm1.getReferenceCount(), 3);
-    for (SegmentDataManager dataManager : segmentDataManagersList) {
-      tableDataManager.releaseSegment(dataManager);
-    }
-    // count is back to original
-    Assert.assertEquals(sdm1.getReferenceCount(), 2);
-    tableDataManager.releaseSegment(sdm1);
-    Assert.assertEquals(sdm1.getReferenceCount(), 1);
-    // Now replace the segment with another one.
-    ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
-    tableDataManager.addSegment(ix2);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 1);
-    // Now the previous one should have been destroyed, and
-    Assert.assertEquals(sdm1.getReferenceCount(), 0);
-    verify(ix1, times(1)).destroy();
-    // Delete ix2 without accessing it.
-    SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg);
-    Assert.assertEquals(sdm2.getReferenceCount(), 1);
-    tableDataManager.removeSegment(anotherSeg);
-    Assert.assertEquals(tableDataManager.getNumSegments(), 0);
-    Assert.assertEquals(sdm2.getReferenceCount(), 0);
-    verify(ix2, times(1)).destroy();
-    tableDataManager.shutDown();
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-local-copy");
+
+    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils
+        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+    File indexDir = tmgr.getSegmentDataDir("seg01");
+    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+    // Same CRCs so load the local copy.
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("1024");
+    when(llmd.getIndexDir()).thenReturn(indexDir);
+
+    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, false);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("k=local"));
+  }
+
+  @Test
+  public void testReloadSegmentForceDownload()
+      throws Exception {
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-force-download");
+
+    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils
+        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+    File indexDir = tmgr.getSegmentDataDir("seg01");
+    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+    // Same CRC but force to download
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("1024");
+    when(llmd.getIndexDir()).thenReturn(indexDir);
+
+    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, true);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("k=remote"));
   }
 
-  /*
-   * These tests simulate the access of segments via OfflineTableDataManager.
-   *
-   * It creates 31 segments (0..30) to start with and adds them to the 
tableDataManager (hi = 30, lo = 0)
-   * It spawns 10 "query" threads, and one "helix" thread.
-   *
-   * The query threads pick up a random of 70% the segments and 'get' them, 
wait a random period of time (5 to 80ms)
-   * and then 'release' the segments back, and does this in a continuous loop.
-   *
-   * The helix thread decides to do one of the following:
-   * - Add a segment (hi+1), and bumps hi by 1 (does this 20% of the time)
-   * - Remove a segment (lo) and bumps up lo by 1 (does this 20% of the time)
-   * - Replaces a segment (a randomm one between (lo,hi), 60% of the time)
-   * and then waits for a random of 50-300ms before attempting one of the ops 
again.
-   */
+  @Test
+  public void testAddOrReplaceSegmentNewData()
+      throws Exception {
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-new-data");
+
+    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils.write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+    // Different CRCs leading to segment download.
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("10240");
+
+    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
llmd);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("docs=0"));
+  }
 
   @Test
-  public void testReplace()
+  public void testAddOrReplaceSegmentNoop()
       throws Exception {
-    _lo = 0;
-    _hi = 30;   // Total number of segments we have in the server.
-    final int numQueryThreads = 10;
-    final int runTimeSec = 20;
-    // With the current parameters, 3k ops take about 15 seconds, create about 
90 segments and drop about half of them
-    // Running with coverage, it provides complete coverage of the (relevant) 
lines in OfflineTableDataManager
-
-    Random random = new Random();
-    TableDataManager tableDataManager = makeTestableManager();
-
-    for (int i = _lo; i <= _hi; i++) {
-      final String segName = SEGMENT_PREFIX + i;
-      tableDataManager.addSegment(makeImmutableSegment(segName, 
random.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
-    }
+    BaseTableDataManager tmgr = makeTestableManager();
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
 
-    runStorageServer(numQueryThreads, runTimeSec, tableDataManager);  // 
replaces segments while online
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("1024");
 
-//    System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + 
",nCreates=" + _allSegments.size());
-    tableDataManager.shutDown();
+    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
llmd);
+    // As CRC is same, the index dir is left as is, so not get created by the 
test.
+    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
   }
 
-  private void runStorageServer(int numQueryThreads, int runTimeSec, 
TableDataManager tableDataManager)
+  @Test
+  public void testAddOrReplaceSegmentRecovered()
       throws Exception {
-    // Start 1 helix worker thread and as many query threads as configured.
-    List<Thread> queryThreads = new ArrayList<>(numQueryThreads);
-    for (int i = 0; i < numQueryThreads; i++) {
-      BaseTableDataManagerTest.TestSegmentUser segUser = new 
BaseTableDataManagerTest.TestSegmentUser(tableDataManager);
-      Thread segUserThread = new Thread(segUser);
-      queryThreads.add(segUserThread);
-      segUserThread.start();
-    }
+    BaseTableDataManager tmgr = makeTestableManager();
 
-    BaseTableDataManagerTest.TestHelixWorker helixWorker =
-        new BaseTableDataManagerTest.TestHelixWorker(tableDataManager);
-    Thread helixWorkerThread = new Thread(helixWorker);
-    helixWorkerThread.start();
-    _masterThread = Thread.currentThread();
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    // Make this equal to the default crc value, so no need to make a dummy 
creation.meta file.
+    when(zkmd.getCrc()).thenReturn(Long.MIN_VALUE);
 
-    try {
-      Thread.sleep(runTimeSec * 1000);
-    } catch (InterruptedException e) {
+    File backup = tmgr.getSegmentDataDir("seg01" + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    FileUtils.write(new File(backup, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01");
 
-    }
-    _closing = true;
+    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
null);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("docs=0"));
+  }
 
-    helixWorkerThread.join();
-    for (Thread t : queryThreads) {
-      t.join();
-    }
+  @Test
+  public void testAddOrReplaceSegmentNotRecovered()
+      throws Exception {
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-force-download");
+
+    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils
+        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+    // Though can recover from backup, but CRC is different. Local CRC is 
Long.MIN_VALUE.
+    File backup = tmgr.getSegmentDataDir("seg01" + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    FileUtils.write(new File(backup, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
null);
+    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+        .contains("k=remote"));
+  }
 
-    if (_exception != null) {
-      Assert.fail("One of the threads failed", _exception);
-    }
+  @Test
+  public void testDownloadAndDecrypt()
+      throws Exception {
+    File tempInput = new File(TEMP_DIR, "tmp.txt");
+    FileUtils.write(tempInput, "this is from somewhere remote");
 
-    // tableDataManager should be quiescent now.
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempInput.getAbsolutePath());
 
-    // All segments we ever created must have a corresponding segment manager.
-    Assert.assertEquals(_allSegManagers.size(), _allSegments.size());
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-download-decrypt");
 
-    final int nSegsAcccessed = _accessedSegManagers.size();
-    for (SegmentDataManager segmentDataManager : _internalSegMap.values()) {
-      Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
-      // We should never have called destroy on these segments. Remove it from 
the list of accessed segments.
-      verify(segmentDataManager.getSegment(), never()).destroy();
-      _allSegManagers.remove(segmentDataManager);
-      _accessedSegManagers.remove(segmentDataManager);
-    }
+    File tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+    assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere 
remote");
 
-    // For the remaining segments in accessed list, destroy must have been 
called exactly once.
-    for (SegmentDataManager segmentDataManager : _allSegManagers) {
-      verify(segmentDataManager.getSegment(), times(1)).destroy();
-      // Also their count should be 0
-      Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
-    }
+    when(zkmd.getCrypterName()).thenReturn("fakePinotCrypter");
+    tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+    assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere 
remote");
 
-    // The number of segments we accessed must be <= total segments created.
-    Assert.assertTrue(nSegsAcccessed <= _allSegments.size(),
-        "Accessed=" + nSegsAcccessed + ",created=" + _allSegments.size());
-    // The number of segments we have seen and that are not there anymore, 
must be <= number destroyed.
-    Assert.assertTrue(_accessedSegManagers.size() <= _nDestroys,
-        "SeenButUnavailableNow=" + _accessedSegManagers.size() + ",Destroys=" 
+ _nDestroys);
+    FakePinotCrypter fakeCrypter = (FakePinotCrypter) 
PinotCrypterFactory.create("fakePinotCrypter");
+    
assertTrue(fakeCrypter._origFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz.enc"));
+    
assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz"));
 
-    // The current number of segments must be the as expected (hi-lo+1)
-    Assert.assertEquals(_internalSegMap.size(), _hi - _lo + 1);
+    try {
+      // Set maxRetry to 0 to cause retry failure immediately.
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(RETRY_COUNT_CONFIG_KEY, 0);
+      SegmentFetcherFactory.init(new PinotConfiguration(properties));
+      tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+      fail();
+    } catch (AttemptsExceededException e) {
+      assertEquals(e.getMessage(), "Operation failed after 0 attempts");
+    }
   }
 
-  private class TestSegmentUser implements Runnable {
-    private static final double ACQUIRE_ALL_PROBABILITY = 0.20;
-    private final int _minUseTimeMs = 5;
-    private final int _maxUseTimeMs = 80;
-    private final int _nSegsPercent = 70; // We use 70% of the segments for 
any query.
-    private final TableDataManager _tableDataManager;
-
-    private TestSegmentUser(TableDataManager tableDataManager) {
-      _tableDataManager = tableDataManager;
-    }
+  @Test
+  public void testUntarAndMoveSegment()
+      throws IOException {
+    BaseTableDataManager tmgr = makeTestableManager();
+    File tempRootDir = tmgr.getSegmentDataDir("test-untar-move");
+
+    // All input and intermediate files are put in the tempRootDir.
+    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempInputDir = new File(tempRootDir, "seg01_input");
+    FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment 
dir");
+    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+    FileUtils.deleteQuietly(tempInputDir);
+
+    // The destination is the segment directory at the same level of 
tempRootDir.
+    File indexDir = tmgr.untarAndMoveSegment("seg01", tempTar, tempRootDir);
+    assertEquals(indexDir, tmgr.getSegmentDataDir("seg01"));
+    assertEquals(FileUtils.readFileToString(new File(indexDir, "tmp.txt")), 
"this is in segment dir");
 
-    @Override
-    public void run() {
-      while (!_closing) {
-        try {
-          List<SegmentDataManager> segmentDataManagers = null;
-          double probability = RANDOM.nextDouble();
-          if (probability <= ACQUIRE_ALL_PROBABILITY) {
-            segmentDataManagers = _tableDataManager.acquireAllSegments();
-          } else {
-            Set<Integer> segmentIds = pickSegments();
-            List<String> segmentList = new ArrayList<>(segmentIds.size());
-            for (Integer segmentId : segmentIds) {
-              segmentList.add(SEGMENT_PREFIX + segmentId);
-            }
-            segmentDataManagers = 
_tableDataManager.acquireSegments(segmentList);
-          }
-          // Some of them may be rejected, but that is OK.
-
-          // Keep track of all segment data managers we ever accessed.
-          for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-            _accessedSegManagers.add(segmentDataManager);
-          }
-          // To simulate real use case, may be we can add a small percent that 
is returned right away after pruning?
-          try {
-            int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) 
+ _minUseTimeMs;
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException e) {
-            _closing = true;
-          }
-          for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-            _tableDataManager.releaseSegment(segmentDataManager);
-          }
-        } catch (Throwable t) {
-          _masterThread.interrupt();
-          _exception = t;
-        }
-      }
+    try {
+      tmgr.untarAndMoveSegment("seg01", new File(tempRootDir, "unknown.txt"), 
TEMP_DIR);
+      fail();
+    } catch (Exception e) {
+      // expected.
     }
+  }
 
-    private Set<Integer> pickSegments() {
-      int hi = _hi;
-      int lo = _lo;
-      int totalSegs = hi - lo + 1;
-      Set<Integer> segmentIds = new HashSet<>(totalSegs);
-      final int nSegments = totalSegs * _nSegsPercent / 100;
-      while (segmentIds.size() != nSegments) {
-        segmentIds.add(RANDOM.nextInt(totalSegs) + lo);
-      }
-      return segmentIds;
-    }
+  @Test
+  public void testIsNewSegmentMetadata()
+      throws IOException {
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+    assertTrue(BaseTableDataManager.isNewSegment(zkmd, null));
+
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("1024");
+    assertFalse(BaseTableDataManager.isNewSegment(zkmd, llmd));
+
+    llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("10245");
+    assertTrue(BaseTableDataManager.isNewSegment(zkmd, llmd));
   }
 
-  private class TestHelixWorker implements Runnable {
-    private final int _removePercent;
-    private final int _replacePercent;
-    private final int _addPercent;
-    private final int _minSleepMs;
-    private final int _maxSleepMs;
-    private final TableDataManager _tableDataManager;
-
-    private TestHelixWorker(TableDataManager tableDataManager) {
-      _tableDataManager = tableDataManager;
-
-      _removePercent = 20;
-      _addPercent = 20;
-      _replacePercent = 60;
-      _minSleepMs = 50;
-      _maxSleepMs = 300;
-    }
+  // Has to be public class for the class loader to work.
+  public static class FakePinotCrypter implements PinotCrypter {
+    private File _origFile;
+    private File _decFile;
 
     @Override
-    public void run() {
-      while (!_closing) {
-        try {
-          int nextInt = RANDOM.nextInt(100);
-          if (nextInt < _removePercent) {
-            removeSegment();
-          } else if (nextInt < _removePercent + _replacePercent) {
-            replaceSegment();
-          } else {
-            addSegment();
-          }
-          try {
-            int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) + 
_minSleepMs;
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException e) {
-            _closing = true;
-          }
-        } catch (Throwable t) {
-          _masterThread.interrupt();
-          _exception = t;
-        }
-      }
+    public void init(PinotConfiguration config) {
     }
 
-    // Add segment _hi + 1,bump hi.
-    private void addSegment() {
-      final int segmentToAdd = _hi + 1;
-      final String segName = SEGMENT_PREFIX + segmentToAdd;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
RANDOM.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
-      _hi = segmentToAdd;
+    @Override
+    public void encrypt(File origFile, File encFile) {
     }
 
-    // Replace a segment between _lo and _hi
-    private void replaceSegment() {
-      int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo;
-      final String segName = SEGMENT_PREFIX + segToReplace;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, 
RANDOM.nextInt()));
-      _allSegManagers.add(_internalSegMap.get(segName));
+    @Override
+    public void decrypt(File origFile, File decFile) {
+      _origFile = origFile;
+      _decFile = decFile;
     }
+  }
 
-    // Remove the segment _lo and then bump _lo
-    private void removeSegment() {
-      // Keep at least one segment in place.
-      if (_hi > _lo) {
-        _tableDataManager.removeSegment(SEGMENT_PREFIX + _lo);
-        _lo++;
-      } else {
-        addSegment();
-      }
-    }
+  private static void initSegmentFetcher()
+      throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(RETRY_COUNT_CONFIG_KEY, 3);
+    properties.put(RETRY_WAIT_MS_CONFIG_KEY, 100);
+    properties.put(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 5);
+    SegmentFetcherFactory.init(new PinotConfiguration(properties));
+
+    // Setup crypter
+    properties.put("class.fakePinotCrypter", FakePinotCrypter.class.getName());
+    PinotCrypterFactory.init(new PinotConfiguration(properties));
+  }
+
+  private static IndexLoadingConfig newDummyIndexLoadingConfig() {
+    IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+    when(indexLoadingConfig.getReadMode()).thenReturn(ReadMode.mmap);
+    when(indexLoadingConfig.getSegmentVersion()).thenReturn(SegmentVersion.v3);
+    return indexLoadingConfig;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
new file mode 100644
index 0000000..e2afd3c
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.HLCSegmentName;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class RealtimeTableDataManagerTest {
+  @Test
+  public void testAllowDownload() {
+    RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
+
+    String groupId = "myTable_REALTIME_1234567_0";
+    String partitionRange = "ALL";
+    String sequenceNumber = "1234567";
+    HLCSegmentName hlc = new HLCSegmentName(groupId, partitionRange, 
sequenceNumber);
+    assertFalse(mgr.allowDownload(hlc.getSegmentName(), null));
+
+    LLCSegmentName llc = new LLCSegmentName("tbl01", 0, 1000000, 
System.currentTimeMillis());
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getStatus()).thenReturn(Status.IN_PROGRESS);
+    assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd));
+
+    when(zkmd.getStatus()).thenReturn(Status.DONE);
+    when(zkmd.getDownloadUrl()).thenReturn("");
+    assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd));
+
+    when(zkmd.getDownloadUrl()).thenReturn("remote");
+    assertTrue(mgr.allowDownload(llc.getSegmentName(), zkmd));
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 4078bda..36634db 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -27,11 +27,14 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.Pair;
 
 
@@ -79,6 +82,27 @@ public interface TableDataManager {
       throws Exception;
 
   /**
+   * Reloads an existing immutable segment for the table, which can be an 
OFFLINE or REALTIME table.
+   * A new segment may be downloaded if the local one has a different CRC; or 
can be forced to download
+   * if forceDownload flag is true. This operation is conducted within a 
failure handling framework
+   * and made transparent to ongoing queries, because the segment is in online 
serving state.
+   */
+  void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+      SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
+      throws Exception;
+
+  /**
+   * Adds or replaces an immutable segment for the table, which can be an 
OFFLINE or REALTIME table.
+   * A new segment may be downloaded if the local one has a different CRC or 
doesn't work as expected.
+   * This operation is conducted outside the failure handling framework as 
used in segment reloading,
+   * because the segment is not yet online serving queries, e.g. this method 
is used to add a new segment,
+   * or transition a segment to online serving state.
+   */
+  void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+      @Nullable SegmentMetadata localMetadata)
+      throws Exception;
+
+  /**
    * Removes a segment from the table.
    */
   void removeSegment(String segmentName);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index d83ed50..e58d020 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -54,6 +54,7 @@ import 
org.apache.pinot.common.restlet.resources.SystemResourceInfo;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.ServiceStatus.Status;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -70,6 +71,7 @@ import 
org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProvider;
 import 
org.apache.pinot.spi.environmentprovider.PinotEnvironmentProviderFactory;
@@ -375,10 +377,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
-    SegmentFetcherAndLoader fetcherAndLoader =
-        new SegmentFetcherAndLoader(_serverConf, instanceDataManager, 
serverMetrics);
+    initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager, fetcherAndLoader);
+        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
     _helixManager.getStateMachineEngine()
         
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
 stateModelFactory);
     // Start the server instance as a pre-connect callback so that it starts 
after connecting to the ZK in order to
@@ -448,7 +449,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
 
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
-        new SegmentMessageHandlerFactory(fetcherAndLoader, 
instanceDataManager, serverMetrics);
+        new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
 
@@ -708,4 +709,23 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     
instanceConfig.getRecord().setMapField(Helix.Instance.SYSTEM_RESOURCE_INFO_KEY, 
systemResourceMap);
     helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig);
   }
+
+  /**
+   * Initialize the components to download segments from deep store. They used 
to be
+   * initialized in SegmentFetcherAndLoader, which has been removed to 
consolidate
+   * segment download functionality for both Offline and Realtime tables. So 
those
+   * components are initialized where SegmentFetcherAndLoader was initialized.
+   */
+  private void initSegmentFetcher(PinotConfiguration config)
+      throws Exception {
+    PinotConfiguration pinotFSConfig = 
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
+    PinotFSFactory.init(pinotFSConfig);
+
+    PinotConfiguration segmentFetcherFactoryConfig =
+        
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
+    SegmentFetcherFactory.init(segmentFetcherFactoryConfig);
+
+    PinotConfiguration pinotCrypterConfig = 
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
+    PinotCrypterFactory.init(pinotCrypterConfig);
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 78aee36..3cf0bbb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -32,11 +32,11 @@ import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -47,11 +47,8 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
-import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -64,8 +61,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * The class <code>HelixInstanceDataManager</code> is the instance data 
manager based on Helix.
- *
- * TODO: move SegmentFetcherAndLoader into this class to make this the top 
level manager
  */
 @ThreadSafe
 public class HelixInstanceDataManager implements InstanceDataManager {
@@ -192,7 +187,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   }
 
   @Override
-  public void reloadSegment(String tableNameWithType, String segmentName)
+  public void reloadSegment(String tableNameWithType, String segmentName, 
boolean forceDownload)
       throws Exception {
     LOGGER.info("Reloading single segment: {} in table: {}", segmentName, 
tableNameWithType);
     SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType, 
segmentName);
@@ -206,13 +201,13 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
 
-    reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema);
+    reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, 
forceDownload);
 
     LOGGER.info("Reloaded single segment: {} in table: {}", segmentName, 
tableNameWithType);
   }
 
   @Override
-  public void reloadAllSegments(String tableNameWithType)
+  public void reloadAllSegments(String tableNameWithType, boolean 
forceDownload)
       throws Exception {
     LOGGER.info("Reloading all segments in table: {}", tableNameWithType);
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
@@ -221,17 +216,18 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
 
     for (SegmentMetadata segmentMetadata : 
getAllSegmentsMetadata(tableNameWithType)) {
-      reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema);
+      reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, 
forceDownload);
     }
 
     LOGGER.info("Reloaded all segments in table: {}", tableNameWithType);
   }
 
   private void reloadSegment(String tableNameWithType, SegmentMetadata 
segmentMetadata, TableConfig tableConfig,
-      @Nullable Schema schema)
+      @Nullable Schema schema, boolean forceDownload)
       throws Exception {
     String segmentName = segmentMetadata.getName();
-    LOGGER.info("Reloading segment: {} in table: {}", segmentName, 
tableNameWithType);
+    LOGGER.info("Reloading segment: {} in table: {} with forceDownload: {}", 
segmentName, tableNameWithType,
+        forceDownload);
 
     TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
     if (tableDataManager == null) {
@@ -259,53 +255,49 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       return;
     }
 
-    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is 
not a directory", indexDir);
-    File parentFile = indexDir.getParentFile();
-    File segmentBackupDir =
-        new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
+    Preconditions.checkNotNull(zkMetadata);
 
     // This method might modify the file on disk. Use segment lock to prevent 
race condition
     Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
     try {
       segmentLock.lock();
 
-      // First rename index directory to segment backup directory so that 
original segment have all file descriptors
-      // point to the segment backup directory to ensure original segment 
serves queries properly
-
-      // Rename index directory to segment backup directory (atomic)
-      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
-          "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
+      // Reloads an existing segment, and the local segment metadata is 
existing as asserted above.
+      tableDataManager.reloadSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+          zkMetadata, segmentMetadata, schema, forceDownload);
+      LOGGER.info("Reloaded segment: {} of table: {}", segmentName, 
tableNameWithType);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
 
-      // Copy from segment backup directory back to index directory
-      FileUtils.copyDirectory(segmentBackupDir, indexDir);
+  @Override
+  public void addOrReplaceSegment(String tableNameWithType, String segmentName)
+      throws Exception {
+    LOGGER.info("Adding or replacing segment: {} for table: {}", segmentName, 
tableNameWithType);
 
-      // Load from index directory
-      ImmutableSegment immutableSegment = ImmutableSegmentLoader
-          .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, 
tableConfig), schema);
+    // Get updated table config and segment metadata from Zookeeper.
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    Preconditions.checkNotNull(tableConfig);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);
+    Preconditions.checkNotNull(zkMetadata);
 
-      // Replace the old segment in memory
-      tableDataManager.addSegment(immutableSegment);
+    // This method might modify the file on disk. Use segment lock to prevent 
race condition
+    Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
+    try {
+      segmentLock.lock();
 
-      // Rename segment backup directory to segment temporary directory 
(atomic)
-      // The reason to first rename then delete is that, renaming is an atomic 
operation, but deleting is not. When we
-      // rename the segment backup directory to segment temporary directory, 
we know the reload already succeeded, so
-      // that we can safely delete the segment temporary directory
-      File segmentTempDir = new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
-      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
-          "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
-          segmentTempDir);
-      LOGGER.info("Reloaded segment: {} in table: {}", segmentName, 
tableNameWithType);
+      // But if table mgr is not created or the segment is not loaded yet, the 
localMetadata
+      // is set to null. Then, addOrReplaceSegment method will load the 
segment accordingly.
+      SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType, 
segmentName);
 
-      // Delete segment temporary directory
-      FileUtils.deleteDirectory(segmentTempDir);
-    } catch (Exception reloadFailureException) {
-      try {
-        LoaderUtils.reloadFailureRecovery(indexDir);
-      } catch (Exception recoveryFailureException) {
-        LOGGER.error("Failed to recover after reload failure", 
recoveryFailureException);
-        reloadFailureException.addSuppressed(recoveryFailureException);
-      }
-      throw reloadFailureException;
+      _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> 
createTableDataManager(k, tableConfig))
+          .addOrReplaceSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+              zkMetadata, localMetadata);
+      LOGGER.info("Added or replaced segment: {} of table: {}", segmentName, 
tableNameWithType);
     } finally {
       segmentLock.unlock();
     }
@@ -361,9 +353,13 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     }
   }
 
+  /**
+   * Assemble the path to segment dir directly, when table mgr object is not
+   * created for the given table yet.
+   */
   @Override
-  public String getSegmentDataDirectory() {
-    return _instanceDataManagerConfig.getInstanceDataDir();
+  public File getSegmentDataDirectory(String tableNameWithType, String 
segmentName) {
+    return new File(new File(_instanceDataManagerConfig.getInstanceDataDir(), 
tableNameWithType), segmentName);
   }
 
   @Override
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
deleted file mode 100644
index 9a4ede4..0000000
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.crypt.PinotCrypter;
-import org.apache.pinot.spi.crypt.PinotCrypterFactory;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class SegmentFetcherAndLoader {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentFetcherAndLoader.class);
-
-  private static final String TAR_GZ_SUFFIX = ".tar.gz";
-  private static final String ENCODED_SUFFIX = ".enc";
-
-  private final InstanceDataManager _instanceDataManager;
-  private final ServerMetrics _serverMetrics;
-
-  public SegmentFetcherAndLoader(PinotConfiguration config, 
InstanceDataManager instanceDataManager,
-      ServerMetrics serverMetrics)
-      throws Exception {
-    _instanceDataManager = instanceDataManager;
-    _serverMetrics = serverMetrics;
-
-    PinotConfiguration pinotFSConfig = 
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
-    PinotConfiguration segmentFetcherFactoryConfig =
-        
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
-    PinotConfiguration pinotCrypterConfig = 
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
-
-    PinotFSFactory.init(pinotFSConfig);
-    SegmentFetcherFactory.init(segmentFetcherFactoryConfig);
-    PinotCrypterFactory.init(pinotCrypterConfig);
-  }
-
-  public void replaceAllOfflineSegments(String tableNameWithType) {
-    LOGGER.info("Replacing all segments in table: {}", tableNameWithType);
-    List<SegmentMetadata> segMds = 
_instanceDataManager.getAllSegmentsMetadata(tableNameWithType);
-    for (SegmentMetadata segMd : segMds) {
-      addOrReplaceOfflineSegment(tableNameWithType, segMd.getName(), true);
-    }
-    LOGGER.info("Replaced all segments in table: {}", tableNameWithType);
-  }
-
-  public void addOrReplaceOfflineSegment(String tableNameWithType, String 
segmentName) {
-    addOrReplaceOfflineSegment(tableNameWithType, segmentName, false);
-  }
-
-  /**
-   * Add a new segment or replace an existing segment for offline table. The 
method checks
-   * the local segment CRC with the one set in ZK by controller. If both 
equal, the method
-   * simply loads the local segment, otherwise it downloads the new segment 
then load. When
-   * forceDownload is set to true, the server always downloads the segment.
-   */
-  public void addOrReplaceOfflineSegment(String tableNameWithType, String 
segmentName, boolean forceDownload) {
-    SegmentZKMetadata newSegmentZKMetadata = ZKMetadataProvider
-        .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), 
tableNameWithType, segmentName);
-    Preconditions.checkNotNull(newSegmentZKMetadata);
-
-    LOGGER.info("Adding or replacing segment {} for table {}, metadata {}, 
downloadIsForced {}", segmentName,
-        tableNameWithType, newSegmentZKMetadata, forceDownload);
-
-    // This method might modify the file on disk. Use segment lock to prevent 
race condition
-    Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
-    try {
-      segmentLock.lock();
-
-      // We lock the segment in order to get its metadata, and then release 
the lock, so it is possible
-      // that the segment is dropped after we get its metadata.
-      SegmentMetadata localSegmentMetadata = 
_instanceDataManager.getSegmentMetadata(tableNameWithType, segmentName);
-
-      if (localSegmentMetadata == null) {
-        LOGGER.info("Segment {} of table {} is not loaded in memory, checking 
disk", segmentName, tableNameWithType);
-        File indexDir = new File(getSegmentLocalDirectory(tableNameWithType, 
segmentName));
-        // Restart during segment reload might leave segment in inconsistent 
state (index directory might not exist but
-        // segment backup directory existed), need to first try to recover 
from reload failure before checking the
-        // existence of the index directory and loading segment metadata from 
it
-        LoaderUtils.reloadFailureRecovery(indexDir);
-        if (indexDir.exists()) {
-          LOGGER.info("Segment {} of table {} found on disk, attempting to 
load it", segmentName, tableNameWithType);
-          try {
-            localSegmentMetadata = new SegmentMetadataImpl(indexDir);
-            LOGGER.info("Found segment {} of table {} with crc {} on disk", 
segmentName, tableNameWithType,
-                localSegmentMetadata.getCrc());
-          } catch (Exception e) {
-            // The localSegmentDir should help us get the table name,
-            LOGGER.error("Failed to load segment metadata from {}. Deleting 
it.", indexDir, e);
-            FileUtils.deleteQuietly(indexDir);
-            localSegmentMetadata = null;
-          }
-          try {
-            if (!forceDownload && !isNewSegmentMetadata(tableNameWithType, 
newSegmentZKMetadata,
-                localSegmentMetadata)) {
-              LOGGER.info("Segment metadata same as before, loading {} of 
table {} (crc {}) from disk", segmentName,
-                  tableNameWithType, localSegmentMetadata.getCrc());
-              _instanceDataManager.addOfflineSegment(tableNameWithType, 
segmentName, indexDir);
-              // TODO Update zk metadata with CRC for this instance
-              return;
-            }
-          } catch (Exception e) {
-            LOGGER
-                .error("Failed to load {} of table {} from local, will try to 
reload it from controller!", segmentName,
-                    tableNameWithType, e);
-            FileUtils.deleteQuietly(indexDir);
-            localSegmentMetadata = null;
-          }
-        }
-      }
-      // There is a very unlikely race condition that we may have gotten the 
metadata of a
-      // segment that was not dropped when we checked, but was dropped after 
the check above.
-      // That is possible only if we get two helix transitions (to drop, and 
then to add back) the
-      // segment at the same, or very close to each other.If the race 
condition triggers, and the
-      // two segments are same in metadata, then we may end up NOT adding back 
the segment
-      // that is in the process of being dropped.
-
-      // If we get here, then either it is the case that we have the segment 
loaded in memory (and therefore present
-      // in disk) or, we need to load from the server. In the former case, we 
still need to check if the metadata
-      // that we have is different from that in zookeeper.
-      if (forceDownload || isNewSegmentMetadata(tableNameWithType, 
newSegmentZKMetadata, localSegmentMetadata)) {
-        if (forceDownload) {
-          LOGGER.info("Force to download segment {} of table {} from 
controller.", segmentName, tableNameWithType);
-        } else if (localSegmentMetadata == null) {
-          LOGGER.info("Loading new segment {} of table {} from controller", 
segmentName, tableNameWithType);
-        } else {
-          LOGGER.info("Trying to refresh segment {} of table {} with new 
data.", segmentName, tableNameWithType);
-        }
-        String uri = newSegmentZKMetadata.getDownloadUrl();
-        String crypterName = newSegmentZKMetadata.getCrypterName();
-        PinotCrypter crypter = (crypterName != null) ? 
PinotCrypterFactory.create(crypterName) : null;
-
-        // Retry will be done here.
-        String localSegmentDir = downloadSegmentToLocal(uri, crypter, 
tableNameWithType, segmentName);
-        SegmentMetadata segmentMetadata = new SegmentMetadataImpl(new 
File(localSegmentDir));
-        _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, 
new File(localSegmentDir));
-        LOGGER.info("Downloaded segment {} of table {} crc {} from 
controller", segmentName, tableNameWithType,
-            segmentMetadata.getCrc());
-      } else {
-        LOGGER.info("Got already loaded segment {} of table {} crc {} again, 
will do nothing.", segmentName,
-            tableNameWithType, localSegmentMetadata.getCrc());
-      }
-    } catch (final Exception e) {
-      LOGGER.error("Cannot load segment : " + segmentName + " for table " + 
tableNameWithType, e);
-      Utils.rethrowException(e);
-      throw new AssertionError("Should not reach this");
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  private boolean isNewSegmentMetadata(String tableNameWithType, 
SegmentZKMetadata newSegmentZKMetadata,
-      @Nullable SegmentMetadata existedSegmentMetadata) {
-    String segmentName = newSegmentZKMetadata.getSegmentName();
-
-    if (existedSegmentMetadata == null) {
-      LOGGER.info("Existed segment metadata is null for segment: {} in table: 
{}", segmentName, tableNameWithType);
-      return true;
-    }
-
-    long newCrc = newSegmentZKMetadata.getCrc();
-    long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc());
-    LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {} 
in table: {}", newCrc, existedCrc,
-        segmentName, tableNameWithType);
-    return newCrc != existedCrc;
-  }
-
-  private String downloadSegmentToLocal(String uri, PinotCrypter crypter, 
String tableName, String segmentName)
-      throws Exception {
-    File tempDir = new File(new 
File(_instanceDataManager.getSegmentFileDirectory(), tableName),
-        "tmp-" + segmentName + "-" + UUID.randomUUID());
-    FileUtils.forceMkdir(tempDir);
-    File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
-    File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
-    File tempSegmentDir = new File(tempDir, segmentName);
-    try {
-      try {
-        SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
-        if (crypter != null) {
-          crypter.decrypt(tempDownloadFile, tempTarFile);
-        } else {
-          tempTarFile = tempDownloadFile;
-        }
-        LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
-            tableName, uri, tempTarFile, tempTarFile.length());
-      } catch (AttemptsExceededException e) {
-        LOGGER.error("Attempts exceeded when downloading segment: {} for 
table: {} from: {} to: {}", segmentName,
-            tableName, uri, tempTarFile);
-        _serverMetrics.addMeteredTableValue(tableName, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
-        Utils.rethrowException(e);
-        return null;
-      }
-
-      try {
-        // If an exception is thrown when untarring, it means the tar file is 
broken OR not found after the retry.
-        // Thus, there's no need to retry again.
-        File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, 
tempSegmentDir).get(0);
-        File indexDir = new File(new 
File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
-        if (indexDir.exists()) {
-          LOGGER.info("Deleting existing index directory for segment: {} for 
table: {}", segmentName, tableName);
-          FileUtils.deleteDirectory(indexDir);
-        }
-        FileUtils.moveDirectory(tempIndexDir, indexDir);
-        LOGGER.info("Successfully downloaded segment: {} for table: {} to: 
{}", segmentName, tableName, indexDir);
-        return indexDir.getAbsolutePath();
-      } catch (Exception e) {
-        LOGGER.error("Exception when untarring segment: {} for table: {} from 
{} to {}", segmentName, tableName,
-            tempTarFile, tempSegmentDir);
-        _serverMetrics.addMeteredTableValue(tableName, 
ServerMeter.UNTAR_FAILURES, 1L);
-        Utils.rethrowException(e);
-        return null;
-      }
-    } finally {
-      FileUtils.deleteQuietly(tempDir);
-    }
-  }
-
-  public String getSegmentLocalDirectory(String tableName, String segmentId) {
-    return _instanceDataManager.getSegmentDataDirectory() + "/" + tableName + 
"/" + segmentId;
-  }
-}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 97f1d23..ad4a386 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -40,14 +40,10 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
   // We only allow limited number of segments refresh/reload happen at the 
same time
   // The reason for that is segment refresh/reload will temporarily use 
double-sized memory
   private final Semaphore _refreshThreadSemaphore;
-
-  private final SegmentFetcherAndLoader _fetcherAndLoader;
   private final InstanceDataManager _instanceDataManager;
   private final ServerMetrics _metrics;
 
-  public SegmentMessageHandlerFactory(SegmentFetcherAndLoader 
fetcherAndLoader, InstanceDataManager instanceDataManager,
-      ServerMetrics metrics) {
-    _fetcherAndLoader = fetcherAndLoader;
+  public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, 
ServerMetrics metrics) {
     _instanceDataManager = instanceDataManager;
     _metrics = metrics;
     int maxParallelRefreshThreads = 
instanceDataManager.getMaxParallelRefreshThreads();
@@ -119,7 +115,7 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       try {
         acquireSema(_segmentName, _logger);
         // The number of retry times depends on the retry count in Constants.
-        _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, 
_segmentName);
+        _instanceDataManager.addOrReplaceSegment(_tableNameWithType, 
_segmentName);
         result.setSuccess(true);
       } catch (Exception e) {
         _metrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REFRESH_FAILURES, 1);
@@ -150,19 +146,11 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
           acquireSema("ALL", _logger);
           // NOTE: the method aborts if any segment reload encounters an 
unhandled exception,
           // and can lead to inconsistent state across segments
-          if (_forceDownload) {
-            _fetcherAndLoader.replaceAllOfflineSegments(_tableNameWithType);
-          } else {
-            _instanceDataManager.reloadAllSegments(_tableNameWithType);
-          }
+          _instanceDataManager.reloadAllSegments(_tableNameWithType, 
_forceDownload);
         } else {
           // Reload one segment
           acquireSema(_segmentName, _logger);
-          if (_forceDownload) {
-            _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, 
_segmentName, true);
-          } else {
-            _instanceDataManager.reloadSegment(_tableNameWithType, 
_segmentName);
-          }
+          _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, 
_forceDownload);
         }
         helixTaskResult.setSuccess(true);
       } catch (Throwable e) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 1a5efe5..d2e4c30 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -53,13 +53,10 @@ import org.slf4j.LoggerFactory;
 public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
   private final String _instanceId;
   private final InstanceDataManager _instanceDataManager;
-  private final SegmentFetcherAndLoader _fetcherAndLoader;
 
-  public SegmentOnlineOfflineStateModelFactory(String instanceId, 
InstanceDataManager instanceDataManager,
-      SegmentFetcherAndLoader fetcherAndLoader) {
+  public SegmentOnlineOfflineStateModelFactory(String instanceId, 
InstanceDataManager instanceDataManager) {
     _instanceId = instanceId;
     _instanceDataManager = instanceDataManager;
-    _fetcherAndLoader = fetcherAndLoader;
   }
 
   public static String getStateModelName() {
@@ -162,7 +159,7 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
         TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(message.getResourceName());
         Preconditions.checkNotNull(tableType);
         if (tableType == TableType.OFFLINE) {
-          _fetcherAndLoader.addOrReplaceOfflineSegment(tableNameWithType, 
segmentName);
+          _instanceDataManager.addOrReplaceSegment(tableNameWithType, 
segmentName);
         } else {
           _instanceDataManager.addRealtimeSegment(tableNameWithType, 
segmentName);
         }
@@ -208,7 +205,7 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       try {
         segmentLock.lock();
 
-        final File segmentDir = new 
File(_fetcherAndLoader.getSegmentLocalDirectory(tableNameWithType, 
segmentName));
+        File segmentDir = 
_instanceDataManager.getSegmentDataDirectory(tableNameWithType, segmentName);
         if (segmentDir.exists()) {
           FileUtils.deleteQuietly(segmentDir);
           _logger.info("Deleted segment directory {}", segmentDir);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2bc0de8..642fa04 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -453,9 +453,11 @@ public class CommonConstants {
     public static class Realtime {
       public enum Status {
         // Means the segment is in CONSUMING state.
-        IN_PROGRESS, // Means the segment is in ONLINE state (segment 
completed consuming and has been saved in
+        IN_PROGRESS,
+        // Means the segment is in ONLINE state (segment completed consuming 
and has been saved in
         // segment store).
-        DONE, // Means the segment is uploaded to a Pinot controller by an 
external party.
+        DONE,
+        // Means the segment is uploaded to a Pinot controller by an external 
party.
         UPLOADED
       }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to