This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a5f3dc5 Move segment download and segment reload into TableManager (#7319) a5f3dc5 is described below commit a5f3dc507e6441baca35dae5bbfad122356683b6 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Wed Sep 15 09:38:14 2021 -0700 Move segment download and segment reload into TableManager (#7319) Deletes SegmentFetcherAndLoader; and move its functionality to BaseTableManager, preparing to combine the download logic with RealtimeTableManager. Besides, adding UTs. --- .../core/data/manager/BaseTableDataManager.java | 230 +++++++ .../core/data/manager/InstanceDataManager.java | 21 +- .../manager/realtime/RealtimeTableDataManager.java | 25 +- ...=> BaseTableDataManagerAcquireSegmentTest.java} | 37 +- .../data/manager/BaseTableDataManagerTest.java | 664 ++++++++++----------- .../realtime/RealtimeTableDataManagerTest.java | 56 ++ .../local/data/manager/TableDataManager.java | 24 + .../server/starter/helix/BaseServerStarter.java | 28 +- .../starter/helix/HelixInstanceDataManager.java | 96 ++- .../starter/helix/SegmentFetcherAndLoader.java | 257 -------- .../helix/SegmentMessageHandlerFactory.java | 20 +- .../SegmentOnlineOfflineStateModelFactory.java | 9 +- .../apache/pinot/spi/utils/CommonConstants.java | 6 +- 13 files changed, 751 insertions(+), 722 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 2933aa8..99bb521 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -18,32 +18,46 @@ */ package org.apache.pinot.core.data.manager; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.LoadingCache; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.Pair; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,4 +270,220 @@ public abstract class BaseTableDataManager implements TableDataManager { .collect(Collectors.toMap(map -> map.getKey().getSecond(), Map.Entry::getValue)); } } + + @Override + public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata, + SegmentMetadata localMetadata, @Nullable Schema schema, boolean forceDownload) + throws Exception { + File indexDir = localMetadata.getIndexDir(); + Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir); + + File parentFile = indexDir.getParentFile(); + File segmentBackupDir = + new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX); + + try { + // First rename index directory to segment backup directory so that original segment have all file descriptors + // point to the segment backup directory to ensure original segment serves queries properly + + // Rename index directory to segment backup directory (atomic) + Preconditions.checkState(indexDir.renameTo(segmentBackupDir), + "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir); + + // Download from remote or copy from local backup directory into index directory, + // and then continue to load the segment from index directory. + boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata); + if (shouldDownload && allowDownload(segmentName, zkMetadata)) { + if (forceDownload) { + LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType); + } else { + LOGGER.info("Download segment:{} of table: {} as local crc: {} mismatches remote crc: {}", segmentName, + _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc()); + } + indexDir = downloadSegment(segmentName, zkMetadata); + } else { + LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType); + FileUtils.copyDirectory(segmentBackupDir, indexDir); + } + + // Load from index directory and replace the old segment in memory. + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema)); + + // Rename segment backup directory to segment temporary directory (atomic) + // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we + // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so + // that we can safely delete the segment temporary directory + File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX); + Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir), + "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir, + segmentTempDir); + FileUtils.deleteDirectory(segmentTempDir); + } catch (Exception reloadFailureException) { + try { + LoaderUtils.reloadFailureRecovery(indexDir); + } catch (Exception recoveryFailureException) { + LOGGER.error("Failed to recover after reload failure", recoveryFailureException); + reloadFailureException.addSuppressed(recoveryFailureException); + } + throw reloadFailureException; + } + } + + @Override + public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, + SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata) + throws Exception { + if (!isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, + _tableNameWithType, localMetadata.getCrc()); + return; + } + + // Try to recover if no local metadata is provided. + if (localMetadata == null) { + LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType); + localMetadata = recoverSegmentQuietly(segmentName); + if (!isNewSegment(zkMetadata, localMetadata)) { + LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType, + localMetadata.getCrc()); + if (loadSegmentQuietly(segmentName, indexLoadingConfig)) { + return; + } + // Set local metadata to null to indicate that the local segment fails to load, + // although it exists and has same crc with the remote one. + localMetadata = null; + } + } + + Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download", + segmentName, _tableNameWithType); + + // Download segment and replace the local one, either due to failure to recover local segment, + // or the segment data is updated and has new CRC now. + if (localMetadata == null) { + LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType); + } else { + LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName, + _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc()); + } + File indexDir = downloadSegment(segmentName, zkMetadata); + addSegment(indexDir, indexLoadingConfig); + LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType, + zkMetadata.getCrc()); + } + + protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { + return true; + } + + protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata) + throws Exception { + // TODO: may support download from peer servers for RealTime table. + return downloadSegmentFromDeepStore(segmentName, zkMetadata); + } + + /** + * Server restart during segment reload might leave segment directory in inconsistent state, like the index + * directory might not exist but segment backup directory existed. This method tries to recover from reload + * failure before checking the existence of the index directory and loading segment metadata from it. + */ + private SegmentMetadata recoverSegmentQuietly(String segmentName) { + File indexDir = getSegmentDataDir(segmentName); + try { + LoaderUtils.reloadFailureRecovery(indexDir); + if (!indexDir.exists()) { + LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType); + return null; + } + SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir); + LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready for loading", segmentName, + _tableNameWithType, localMetadata.getCrc()); + return localMetadata; + } catch (Exception e) { + LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e); + FileUtils.deleteQuietly(indexDir); + return null; + } + } + + private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig indexLoadingConfig) { + File indexDir = getSegmentDataDir(segmentName); + try { + addSegment(indexDir, indexLoadingConfig); + LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, _tableNameWithType); + return true; + } catch (Exception e) { + FileUtils.deleteQuietly(indexDir); + LOGGER.error("Failed to load segment: {} of table: {} from disk", segmentName, _tableNameWithType, e); + return false; + } + } + + private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata zkMetadata) + throws Exception { + File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); + FileUtils.forceMkdir(tempRootDir); + try { + File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir); + return untarAndMoveSegment(segmentName, tarFile, tempRootDir); + } finally { + FileUtils.deleteQuietly(tempRootDir); + } + } + + @VisibleForTesting + File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir) + throws Exception { + File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + String uri = zkMetadata.getDownloadUrl(); + try { + SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); + LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, + _tableNameWithType, uri, tarFile, tarFile.length()); + return tarFile; + } catch (AttemptsExceededException e) { + LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, + _tableNameWithType, uri, tarFile); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); + throw e; + } + } + + @VisibleForTesting + File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir) + throws IOException { + File untarDir = new File(tempRootDir, segmentName); + try { + // If an exception is thrown when untarring, it means the tar file is broken + // or not found after the retry. Thus, there's no need to retry again. + File untaredSegDir = TarGzCompressionUtils.untar(tarFile, untarDir).get(0); + LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile, untarDir); + // Replace the existing index directory. + File indexDir = getSegmentDataDir(segmentName); + FileUtils.deleteDirectory(indexDir); + FileUtils.moveDirectory(untaredSegDir, indexDir); + LOGGER.info("Successfully downloaded segment: {} of table: {} to index dir: {}", segmentName, _tableNameWithType, + indexDir); + return indexDir; + } catch (Exception e) { + LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}", segmentName, _tableNameWithType, tarFile, + untarDir); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UNTAR_FAILURES, 1L); + throw e; + } + } + + @VisibleForTesting + File getSegmentDataDir(String segmentName) { + return new File(_indexDir, segmentName); + } + + @VisibleForTesting + static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata) { + return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata); + } + + private static boolean hasSameCRC(SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata) { + return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc()); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 4cb2da2..66074a8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -80,15 +80,26 @@ public interface InstanceDataManager { throws Exception; /** - * Reloads a segment in a table. + * Reloads a segment in a table. This method can download a new segment to replace the local + * one before loading. Download happens when local segment's CRC mismatches the one of + * the remote segment; but can also be forced to do regardless of CRC. */ - void reloadSegment(String tableNameWithType, String segmentName) + void reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) throws Exception; /** - * Reloads all segment in a table. + * Reloads all segments in a table. */ - void reloadAllSegments(String tableNameWithType) + void reloadAllSegments(String tableNameWithType, boolean forceDownload) + throws Exception; + + /** + * Adds or replaces a segment in a table. Different from segment reloading, this method + * doesn't assume the existence of TableDataManager object and it can actually initialize + * the TableDataManager for the segment. A new segment is downloaded if the local one is + * not working or has a different CRC from the remote one. + */ + void addOrReplaceSegment(String tableNameWithType, String segmentName) throws Exception; /** @@ -116,7 +127,7 @@ public interface InstanceDataManager { /** * Returns the directory for un-tarred segment data. */ - String getSegmentDataDirectory(); + File getSegmentDataDirectory(String tableNameWithType, String segmentName); /** * Returns the directory for tarred segment files. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 931498a..f8d57ff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -251,15 +251,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager { * This call comes in one of two ways: * For HL Segments: * - We are being directed by helix to own up all the segments that we committed and are still in retention. In - * this case - * we treat it exactly like how OfflineTableDataManager would -- wrap it into an OfflineSegmentDataManager, and - * put it - * in the map. + * this case we treat it exactly like how OfflineTableDataManager would -- wrap it into an + * OfflineSegmentDataManager, and put it in the map. * - We are being asked to own up a new realtime segment. In this case, we wrap the segment with a - * RealTimeSegmentDataManager - * (that kicks off consumption). When the segment is committed we get notified via the notifySegmentCommitted - * call, at - * which time we replace the segment with the OfflineSegmentDataManager + * RealTimeSegmentDataManager (that kicks off consumption). When the segment is committed we get notified via the + * notifySegmentCommitted call, at which time we replace the segment with the OfflineSegmentDataManager + * * For LL Segments: * - We are being asked to start consuming from a partition. * - We did not know about the segment and are being asked to download and own the segment (re-balancing, or @@ -412,7 +409,17 @@ public class RealtimeTableDataManager extends BaseTableDataManager { partitionUpsertMetadataManager.addSegment(immutableSegment, recordInfoIterator); } - public void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, + @Override + protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) { + // Only LLC immutable segment allows download. + if (SegmentName.isHighLevelConsumerSegmentName(segmentName) || zkMetadata.getStatus() == Status.IN_PROGRESS) { + return false; + } + // TODO: may support download from peer servers as well. + return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl()); + } + + void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) { String uri = segmentZKMetadata.getDownloadUrl(); if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java similarity index 94% copy from pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java copy to pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index 32edb2e..a34b569 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -49,14 +49,13 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.*; -public class BaseTableDataManagerTest { - - private static final Random RANDOM = new Random(); +public class BaseTableDataManagerAcquireSegmentTest { private static final String TABLE_NAME = "testTable"; private static final String SEGMENT_PREFIX = "segment"; // Set once for the suite private File _tmpDir; + private Random _random; // Set once for every test private volatile int _nDestroys; @@ -82,6 +81,10 @@ public class BaseTableDataManagerTest { throws Exception { _tmpDir = File.createTempFile("OfflineTableDataManagerTest", null); _tmpDir.deleteOnExit(); + + long seed = System.currentTimeMillis(); + _random = new Random(seed); + System.out.printf("Record random seed: %d to reproduce test results upon failure\n", seed); } @AfterSuite @@ -229,18 +232,15 @@ public class BaseTableDataManagerTest { // With the current parameters, 3k ops take about 15 seconds, create about 90 segments and drop about half of them // Running with coverage, it provides complete coverage of the (relevant) lines in OfflineTableDataManager - Random random = new Random(); TableDataManager tableDataManager = makeTestableManager(); for (int i = _lo; i <= _hi; i++) { final String segName = SEGMENT_PREFIX + i; - tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt())); + tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); _allSegManagers.add(_internalSegMap.get(segName)); } runStorageServer(numQueryThreads, runTimeSec, tableDataManager); // replaces segments while online - -// System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + ",nCreates=" + _allSegments.size()); tableDataManager.shutDown(); } @@ -249,14 +249,15 @@ public class BaseTableDataManagerTest { // Start 1 helix worker thread and as many query threads as configured. List<Thread> queryThreads = new ArrayList<>(numQueryThreads); for (int i = 0; i < numQueryThreads; i++) { - BaseTableDataManagerTest.TestSegmentUser segUser = new BaseTableDataManagerTest.TestSegmentUser(tableDataManager); + BaseTableDataManagerAcquireSegmentTest.TestSegmentUser segUser = + new BaseTableDataManagerAcquireSegmentTest.TestSegmentUser(tableDataManager); Thread segUserThread = new Thread(segUser); queryThreads.add(segUserThread); segUserThread.start(); } - BaseTableDataManagerTest.TestHelixWorker helixWorker = - new BaseTableDataManagerTest.TestHelixWorker(tableDataManager); + BaseTableDataManagerAcquireSegmentTest.TestHelixWorker helixWorker = + new BaseTableDataManagerAcquireSegmentTest.TestHelixWorker(tableDataManager); Thread helixWorkerThread = new Thread(helixWorker); helixWorkerThread.start(); _masterThread = Thread.currentThread(); @@ -325,7 +326,7 @@ public class BaseTableDataManagerTest { while (!_closing) { try { List<SegmentDataManager> segmentDataManagers = null; - double probability = RANDOM.nextDouble(); + double probability = _random.nextDouble(); if (probability <= ACQUIRE_ALL_PROBABILITY) { segmentDataManagers = _tableDataManager.acquireAllSegments(); } else { @@ -344,7 +345,7 @@ public class BaseTableDataManagerTest { } // To simulate real use case, may be we can add a small percent that is returned right away after pruning? try { - int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) + _minUseTimeMs; + int sleepTime = _random.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) + _minUseTimeMs; Thread.sleep(sleepTime); } catch (InterruptedException e) { _closing = true; @@ -366,7 +367,7 @@ public class BaseTableDataManagerTest { Set<Integer> segmentIds = new HashSet<>(totalSegs); final int nSegments = totalSegs * _nSegsPercent / 100; while (segmentIds.size() != nSegments) { - segmentIds.add(RANDOM.nextInt(totalSegs) + lo); + segmentIds.add(_random.nextInt(totalSegs) + lo); } return segmentIds; } @@ -394,7 +395,7 @@ public class BaseTableDataManagerTest { public void run() { while (!_closing) { try { - int nextInt = RANDOM.nextInt(100); + int nextInt = _random.nextInt(100); if (nextInt < _removePercent) { removeSegment(); } else if (nextInt < _removePercent + _replacePercent) { @@ -403,7 +404,7 @@ public class BaseTableDataManagerTest { addSegment(); } try { - int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) + _minSleepMs; + int sleepTime = _random.nextInt(_maxSleepMs - _minSleepMs + 1) + _minSleepMs; Thread.sleep(sleepTime); } catch (InterruptedException e) { _closing = true; @@ -419,16 +420,16 @@ public class BaseTableDataManagerTest { private void addSegment() { final int segmentToAdd = _hi + 1; final String segName = SEGMENT_PREFIX + segmentToAdd; - _tableDataManager.addSegment(makeImmutableSegment(segName, RANDOM.nextInt())); + _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); _allSegManagers.add(_internalSegMap.get(segName)); _hi = segmentToAdd; } // Replace a segment between _lo and _hi private void replaceSegment() { - int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo; + int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo; final String segName = SEGMENT_PREFIX + segToReplace; - _tableDataManager.addSegment(makeImmutableSegment(segName, RANDOM.nextInt())); + _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt())); _allSegManagers.add(_internalSegMap.get(segName)); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index 32edb2e..1a5a3d5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -19,428 +19,382 @@ package org.apache.pinot.core.data.manager; import java.io.File; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; -import org.apache.pinot.segment.local.data.manager.SegmentDataManager; -import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; -import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.testng.Assert; -import org.testng.annotations.AfterSuite; +import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.spi.crypt.PinotCrypter; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeSuite; import org.testng.annotations.Test; -import static org.mockito.Mockito.*; +import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY; +import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY; +import static org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class BaseTableDataManagerTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "OfflineTableDataManagerTest"); - private static final Random RANDOM = new Random(); - private static final String TABLE_NAME = "testTable"; - private static final String SEGMENT_PREFIX = "segment"; - - // Set once for the suite - private File _tmpDir; - - // Set once for every test - private volatile int _nDestroys; - private volatile boolean _closing; - private Set<ImmutableSegment> _allSegments = new HashSet<>(); - private Set<SegmentDataManager> _accessedSegManagers = - Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager, Boolean>()); - private Set<SegmentDataManager> _allSegManagers = - Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager, Boolean>()); - private AtomicInteger _numQueries = new AtomicInteger(0); - private Map<String, ImmutableSegmentDataManager> _internalSegMap; - private Throwable _exception; - private Thread _masterThread; - // Segment numbers in place. - // When we add a segment, we add hi+1, and bump _hi. - // When we remove a segment, we remove _lo and bump _lo - // When we replace a segment, we pick a number between _hi and _lo (inclusive) - private volatile int _lo; - private volatile int _hi; - - @BeforeSuite + private static final String TABLE_NAME = "__table01__"; + + @BeforeMethod public void setUp() throws Exception { - _tmpDir = File.createTempFile("OfflineTableDataManagerTest", null); - _tmpDir.deleteOnExit(); + TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); + initSegmentFetcher(); } - @AfterSuite - public void tearDown() { - if (_tmpDir != null) { - org.apache.commons.io.FileUtils.deleteQuietly(_tmpDir); - } + @AfterMethod + public void tearDown() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); } - @BeforeMethod - public void beforeMethod() { - _nDestroys = 0; - _closing = false; - _allSegments.clear(); - _accessedSegManagers.clear(); - _allSegManagers.clear(); - _numQueries.set(0); - _exception = null; - _masterThread = null; - } + private BaseTableDataManager makeTestableManager() { + TableDataManagerConfig config = mock(TableDataManagerConfig.class); + when(config.getTableName()).thenReturn(TABLE_NAME); + when(config.getDataDir()).thenReturn(new File(TEMP_DIR, TABLE_NAME).getAbsolutePath()); - private TableDataManager makeTestableManager() - throws Exception { - TableDataManager tableDataManager = new OfflineTableDataManager(); - TableDataManagerConfig config; - { - config = mock(TableDataManagerConfig.class); - when(config.getTableName()).thenReturn(TABLE_NAME); - when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath()); - } + OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); tableDataManager.start(); - Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); - segsMapField.setAccessible(true); - _internalSegMap = (Map<String, ImmutableSegmentDataManager>) segsMapField.get(tableDataManager); return tableDataManager; } - private ImmutableSegment makeImmutableSegment(String segmentName, int totalDocs) { - ImmutableSegment immutableSegment = mock(ImmutableSegment.class); - SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); - when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata); - when(immutableSegment.getSegmentName()).thenReturn(segmentName); - when(immutableSegment.getSegmentMetadata().getTotalDocs()).thenReturn(totalDocs); - doAnswer(invocation -> { - _nDestroys++; - return null; - }).when(immutableSegment).destroy(); - _allSegments.add(immutableSegment); - return immutableSegment; + @Test + public void testReloadSegmentNewData() + throws Exception { + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-new-data"); + + // Create an empty segment and compress it to tar.gz as the one in deep store. + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils + .write(new File(tempInputDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=remove"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath()); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + + File indexDir = tmgr.getSegmentDataDir("seg01"); + FileUtils.write(new File(indexDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=local"); + + // Different CRCs leading to segment download. + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("10240"); + when(llmd.getIndexDir()).thenReturn(indexDir); + + tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, null, false); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("k=remove")); } @Test - public void basicTest() + public void testReloadSegmentLocalCopy() throws Exception { - TableDataManager tableDataManager = makeTestableManager(); - Assert.assertEquals(tableDataManager.getNumSegments(), 0); - final String segmentName = "TestSegment"; - final int totalDocs = 23456; - // Add the segment, get it for use, remove the segment, and then return it. - // Make sure that the segment is not destroyed before return. - ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, totalDocs); - tableDataManager.addSegment(immutableSegment); - Assert.assertEquals(tableDataManager.getNumSegments(), 1); - SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); - Assert.assertEquals(segmentDataManager.getReferenceCount(), 2); - tableDataManager.removeSegment(segmentName); - Assert.assertEquals(tableDataManager.getNumSegments(), 0); - Assert.assertEquals(segmentDataManager.getReferenceCount(), 1); - Assert.assertEquals(_nDestroys, 0); - tableDataManager.releaseSegment(segmentDataManager); - Assert.assertEquals(segmentDataManager.getReferenceCount(), 0); - Assert.assertEquals(_nDestroys, 1); - - // Now the segment should not be available for use.Also, returning a null reader is fine - segmentDataManager = tableDataManager.acquireSegment(segmentName); - Assert.assertNull(segmentDataManager); - List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments(); - Assert.assertEquals(segmentDataManagers.size(), 0); - - // Removing the segment again is fine. - tableDataManager.removeSegment(segmentName); - Assert.assertEquals(tableDataManager.getNumSegments(), 0); - - // Add a new segment and remove it in order this time. - final String anotherSeg = "AnotherSegment"; - ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs); - tableDataManager.addSegment(ix1); - Assert.assertEquals(tableDataManager.getNumSegments(), 1); - SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg); - Assert.assertNotNull(sdm1); - Assert.assertEquals(sdm1.getReferenceCount(), 2); - // acquire all segments - List<SegmentDataManager> segmentDataManagersList = tableDataManager.acquireAllSegments(); - Assert.assertEquals(segmentDataManagersList.size(), 1); - Assert.assertEquals(sdm1.getReferenceCount(), 3); - for (SegmentDataManager dataManager : segmentDataManagersList) { - tableDataManager.releaseSegment(dataManager); - } - // count is back to original - Assert.assertEquals(sdm1.getReferenceCount(), 2); - tableDataManager.releaseSegment(sdm1); - Assert.assertEquals(sdm1.getReferenceCount(), 1); - // Now replace the segment with another one. - ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1); - tableDataManager.addSegment(ix2); - Assert.assertEquals(tableDataManager.getNumSegments(), 1); - // Now the previous one should have been destroyed, and - Assert.assertEquals(sdm1.getReferenceCount(), 0); - verify(ix1, times(1)).destroy(); - // Delete ix2 without accessing it. - SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg); - Assert.assertEquals(sdm2.getReferenceCount(), 1); - tableDataManager.removeSegment(anotherSeg); - Assert.assertEquals(tableDataManager.getNumSegments(), 0); - Assert.assertEquals(sdm2.getReferenceCount(), 0); - verify(ix2, times(1)).destroy(); - tableDataManager.shutDown(); + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-local-copy"); + + // Create an empty segment and compress it to tar.gz as the one in deep store. + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils + .write(new File(tempInputDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=remote"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath()); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + + File indexDir = tmgr.getSegmentDataDir("seg01"); + FileUtils.write(new File(indexDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=local"); + + // Same CRCs so load the local copy. + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("1024"); + when(llmd.getIndexDir()).thenReturn(indexDir); + + tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, null, false); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("k=local")); + } + + @Test + public void testReloadSegmentForceDownload() + throws Exception { + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-force-download"); + + // Create an empty segment and compress it to tar.gz as the one in deep store. + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils + .write(new File(tempInputDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=remote"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath()); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + + File indexDir = tmgr.getSegmentDataDir("seg01"); + FileUtils.write(new File(indexDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=local"); + + // Same CRC but force to download + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("1024"); + when(llmd.getIndexDir()).thenReturn(indexDir); + + tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, null, true); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("k=remote")); } - /* - * These tests simulate the access of segments via OfflineTableDataManager. - * - * It creates 31 segments (0..30) to start with and adds them to the tableDataManager (hi = 30, lo = 0) - * It spawns 10 "query" threads, and one "helix" thread. - * - * The query threads pick up a random of 70% the segments and 'get' them, wait a random period of time (5 to 80ms) - * and then 'release' the segments back, and does this in a continuous loop. - * - * The helix thread decides to do one of the following: - * - Add a segment (hi+1), and bumps hi by 1 (does this 20% of the time) - * - Remove a segment (lo) and bumps up lo by 1 (does this 20% of the time) - * - Replaces a segment (a randomm one between (lo,hi), 60% of the time) - * and then waits for a random of 50-300ms before attempting one of the ops again. - */ + @Test + public void testAddOrReplaceSegmentNewData() + throws Exception { + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-new-data"); + + // Create an empty segment and compress it to tar.gz as the one in deep store. + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils.write(new File(tempInputDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath()); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + + // Different CRCs leading to segment download. + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("10240"); + + assertFalse(tmgr.getSegmentDataDir("seg01").exists()); + tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("docs=0")); + } @Test - public void testReplace() + public void testAddOrReplaceSegmentNoop() throws Exception { - _lo = 0; - _hi = 30; // Total number of segments we have in the server. - final int numQueryThreads = 10; - final int runTimeSec = 20; - // With the current parameters, 3k ops take about 15 seconds, create about 90 segments and drop about half of them - // Running with coverage, it provides complete coverage of the (relevant) lines in OfflineTableDataManager - - Random random = new Random(); - TableDataManager tableDataManager = makeTestableManager(); - - for (int i = _lo; i <= _hi; i++) { - final String segName = SEGMENT_PREFIX + i; - tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); - } + BaseTableDataManager tmgr = makeTestableManager(); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); - runStorageServer(numQueryThreads, runTimeSec, tableDataManager); // replaces segments while online + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("1024"); -// System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys + ",nCreates=" + _allSegments.size()); - tableDataManager.shutDown(); + assertFalse(tmgr.getSegmentDataDir("seg01").exists()); + tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd); + // As CRC is same, the index dir is left as is, so not get created by the test. + assertFalse(tmgr.getSegmentDataDir("seg01").exists()); } - private void runStorageServer(int numQueryThreads, int runTimeSec, TableDataManager tableDataManager) + @Test + public void testAddOrReplaceSegmentRecovered() throws Exception { - // Start 1 helix worker thread and as many query threads as configured. - List<Thread> queryThreads = new ArrayList<>(numQueryThreads); - for (int i = 0; i < numQueryThreads; i++) { - BaseTableDataManagerTest.TestSegmentUser segUser = new BaseTableDataManagerTest.TestSegmentUser(tableDataManager); - Thread segUserThread = new Thread(segUser); - queryThreads.add(segUserThread); - segUserThread.start(); - } + BaseTableDataManager tmgr = makeTestableManager(); - BaseTableDataManagerTest.TestHelixWorker helixWorker = - new BaseTableDataManagerTest.TestHelixWorker(tableDataManager); - Thread helixWorkerThread = new Thread(helixWorker); - helixWorkerThread.start(); - _masterThread = Thread.currentThread(); + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + // Make this equal to the default crc value, so no need to make a dummy creation.meta file. + when(zkmd.getCrc()).thenReturn(Long.MIN_VALUE); - try { - Thread.sleep(runTimeSec * 1000); - } catch (InterruptedException e) { + File backup = tmgr.getSegmentDataDir("seg01" + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX); + FileUtils.write(new File(backup, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01"); - } - _closing = true; + assertFalse(tmgr.getSegmentDataDir("seg01").exists()); + tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, null); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("docs=0")); + } - helixWorkerThread.join(); - for (Thread t : queryThreads) { - t.join(); - } + @Test + public void testAddOrReplaceSegmentNotRecovered() + throws Exception { + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-force-download"); + + // Create an empty segment and compress it to tar.gz as the one in deep store. + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils + .write(new File(tempInputDir, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=remote"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempTar.getAbsolutePath()); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + + // Though can recover from backup, but CRC is different. Local CRC is Long.MIN_VALUE. + File backup = tmgr.getSegmentDataDir("seg01" + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX); + FileUtils.write(new File(backup, "metadata.properties"), "segment.total.docs=0\nsegment.name=seg01\nk=local"); + + assertFalse(tmgr.getSegmentDataDir("seg01").exists()); + tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, null); + assertTrue(tmgr.getSegmentDataDir("seg01").exists()); + assertTrue(FileUtils.readFileToString(new File(tmgr.getSegmentDataDir("seg01"), "metadata.properties")) + .contains("k=remote")); + } - if (_exception != null) { - Assert.fail("One of the threads failed", _exception); - } + @Test + public void testDownloadAndDecrypt() + throws Exception { + File tempInput = new File(TEMP_DIR, "tmp.txt"); + FileUtils.write(tempInput, "this is from somewhere remote"); - // tableDataManager should be quiescent now. + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getDownloadUrl()).thenReturn("file://" + tempInput.getAbsolutePath()); - // All segments we ever created must have a corresponding segment manager. - Assert.assertEquals(_allSegManagers.size(), _allSegments.size()); + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-download-decrypt"); - final int nSegsAcccessed = _accessedSegManagers.size(); - for (SegmentDataManager segmentDataManager : _internalSegMap.values()) { - Assert.assertEquals(segmentDataManager.getReferenceCount(), 1); - // We should never have called destroy on these segments. Remove it from the list of accessed segments. - verify(segmentDataManager.getSegment(), never()).destroy(); - _allSegManagers.remove(segmentDataManager); - _accessedSegManagers.remove(segmentDataManager); - } + File tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir); + assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere remote"); - // For the remaining segments in accessed list, destroy must have been called exactly once. - for (SegmentDataManager segmentDataManager : _allSegManagers) { - verify(segmentDataManager.getSegment(), times(1)).destroy(); - // Also their count should be 0 - Assert.assertEquals(segmentDataManager.getReferenceCount(), 0); - } + when(zkmd.getCrypterName()).thenReturn("fakePinotCrypter"); + tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir); + assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere remote"); - // The number of segments we accessed must be <= total segments created. - Assert.assertTrue(nSegsAcccessed <= _allSegments.size(), - "Accessed=" + nSegsAcccessed + ",created=" + _allSegments.size()); - // The number of segments we have seen and that are not there anymore, must be <= number destroyed. - Assert.assertTrue(_accessedSegManagers.size() <= _nDestroys, - "SeenButUnavailableNow=" + _accessedSegManagers.size() + ",Destroys=" + _nDestroys); + FakePinotCrypter fakeCrypter = (FakePinotCrypter) PinotCrypterFactory.create("fakePinotCrypter"); + assertTrue(fakeCrypter._origFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz.enc")); + assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz")); - // The current number of segments must be the as expected (hi-lo+1) - Assert.assertEquals(_internalSegMap.size(), _hi - _lo + 1); + try { + // Set maxRetry to 0 to cause retry failure immediately. + Map<String, Object> properties = new HashMap<>(); + properties.put(RETRY_COUNT_CONFIG_KEY, 0); + SegmentFetcherFactory.init(new PinotConfiguration(properties)); + tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir); + fail(); + } catch (AttemptsExceededException e) { + assertEquals(e.getMessage(), "Operation failed after 0 attempts"); + } } - private class TestSegmentUser implements Runnable { - private static final double ACQUIRE_ALL_PROBABILITY = 0.20; - private final int _minUseTimeMs = 5; - private final int _maxUseTimeMs = 80; - private final int _nSegsPercent = 70; // We use 70% of the segments for any query. - private final TableDataManager _tableDataManager; - - private TestSegmentUser(TableDataManager tableDataManager) { - _tableDataManager = tableDataManager; - } + @Test + public void testUntarAndMoveSegment() + throws IOException { + BaseTableDataManager tmgr = makeTestableManager(); + File tempRootDir = tmgr.getSegmentDataDir("test-untar-move"); + + // All input and intermediate files are put in the tempRootDir. + File tempTar = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempInputDir = new File(tempRootDir, "seg01_input"); + FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment dir"); + TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar); + FileUtils.deleteQuietly(tempInputDir); + + // The destination is the segment directory at the same level of tempRootDir. + File indexDir = tmgr.untarAndMoveSegment("seg01", tempTar, tempRootDir); + assertEquals(indexDir, tmgr.getSegmentDataDir("seg01")); + assertEquals(FileUtils.readFileToString(new File(indexDir, "tmp.txt")), "this is in segment dir"); - @Override - public void run() { - while (!_closing) { - try { - List<SegmentDataManager> segmentDataManagers = null; - double probability = RANDOM.nextDouble(); - if (probability <= ACQUIRE_ALL_PROBABILITY) { - segmentDataManagers = _tableDataManager.acquireAllSegments(); - } else { - Set<Integer> segmentIds = pickSegments(); - List<String> segmentList = new ArrayList<>(segmentIds.size()); - for (Integer segmentId : segmentIds) { - segmentList.add(SEGMENT_PREFIX + segmentId); - } - segmentDataManagers = _tableDataManager.acquireSegments(segmentList); - } - // Some of them may be rejected, but that is OK. - - // Keep track of all segment data managers we ever accessed. - for (SegmentDataManager segmentDataManager : segmentDataManagers) { - _accessedSegManagers.add(segmentDataManager); - } - // To simulate real use case, may be we can add a small percent that is returned right away after pruning? - try { - int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1) + _minUseTimeMs; - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - _closing = true; - } - for (SegmentDataManager segmentDataManager : segmentDataManagers) { - _tableDataManager.releaseSegment(segmentDataManager); - } - } catch (Throwable t) { - _masterThread.interrupt(); - _exception = t; - } - } + try { + tmgr.untarAndMoveSegment("seg01", new File(tempRootDir, "unknown.txt"), TEMP_DIR); + fail(); + } catch (Exception e) { + // expected. } + } - private Set<Integer> pickSegments() { - int hi = _hi; - int lo = _lo; - int totalSegs = hi - lo + 1; - Set<Integer> segmentIds = new HashSet<>(totalSegs); - final int nSegments = totalSegs * _nSegsPercent / 100; - while (segmentIds.size() != nSegments) { - segmentIds.add(RANDOM.nextInt(totalSegs) + lo); - } - return segmentIds; - } + @Test + public void testIsNewSegmentMetadata() + throws IOException { + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getCrc()).thenReturn(Long.valueOf(1024)); + assertTrue(BaseTableDataManager.isNewSegment(zkmd, null)); + + SegmentMetadata llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("1024"); + assertFalse(BaseTableDataManager.isNewSegment(zkmd, llmd)); + + llmd = mock(SegmentMetadata.class); + when(llmd.getCrc()).thenReturn("10245"); + assertTrue(BaseTableDataManager.isNewSegment(zkmd, llmd)); } - private class TestHelixWorker implements Runnable { - private final int _removePercent; - private final int _replacePercent; - private final int _addPercent; - private final int _minSleepMs; - private final int _maxSleepMs; - private final TableDataManager _tableDataManager; - - private TestHelixWorker(TableDataManager tableDataManager) { - _tableDataManager = tableDataManager; - - _removePercent = 20; - _addPercent = 20; - _replacePercent = 60; - _minSleepMs = 50; - _maxSleepMs = 300; - } + // Has to be public class for the class loader to work. + public static class FakePinotCrypter implements PinotCrypter { + private File _origFile; + private File _decFile; @Override - public void run() { - while (!_closing) { - try { - int nextInt = RANDOM.nextInt(100); - if (nextInt < _removePercent) { - removeSegment(); - } else if (nextInt < _removePercent + _replacePercent) { - replaceSegment(); - } else { - addSegment(); - } - try { - int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) + _minSleepMs; - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - _closing = true; - } - } catch (Throwable t) { - _masterThread.interrupt(); - _exception = t; - } - } + public void init(PinotConfiguration config) { } - // Add segment _hi + 1,bump hi. - private void addSegment() { - final int segmentToAdd = _hi + 1; - final String segName = SEGMENT_PREFIX + segmentToAdd; - _tableDataManager.addSegment(makeImmutableSegment(segName, RANDOM.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); - _hi = segmentToAdd; + @Override + public void encrypt(File origFile, File encFile) { } - // Replace a segment between _lo and _hi - private void replaceSegment() { - int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo; - final String segName = SEGMENT_PREFIX + segToReplace; - _tableDataManager.addSegment(makeImmutableSegment(segName, RANDOM.nextInt())); - _allSegManagers.add(_internalSegMap.get(segName)); + @Override + public void decrypt(File origFile, File decFile) { + _origFile = origFile; + _decFile = decFile; } + } - // Remove the segment _lo and then bump _lo - private void removeSegment() { - // Keep at least one segment in place. - if (_hi > _lo) { - _tableDataManager.removeSegment(SEGMENT_PREFIX + _lo); - _lo++; - } else { - addSegment(); - } - } + private static void initSegmentFetcher() + throws Exception { + Map<String, Object> properties = new HashMap<>(); + properties.put(RETRY_COUNT_CONFIG_KEY, 3); + properties.put(RETRY_WAIT_MS_CONFIG_KEY, 100); + properties.put(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 5); + SegmentFetcherFactory.init(new PinotConfiguration(properties)); + + // Setup crypter + properties.put("class.fakePinotCrypter", FakePinotCrypter.class.getName()); + PinotCrypterFactory.init(new PinotConfiguration(properties)); + } + + private static IndexLoadingConfig newDummyIndexLoadingConfig() { + IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class); + when(indexLoadingConfig.getReadMode()).thenReturn(ReadMode.mmap); + when(indexLoadingConfig.getSegmentVersion()).thenReturn(SegmentVersion.v3); + return indexLoadingConfig; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java new file mode 100644 index 0000000..e2afd3c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.HLCSegmentName; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.testng.annotations.Test; + +import static org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class RealtimeTableDataManagerTest { + @Test + public void testAllowDownload() { + RealtimeTableDataManager mgr = new RealtimeTableDataManager(null); + + String groupId = "myTable_REALTIME_1234567_0"; + String partitionRange = "ALL"; + String sequenceNumber = "1234567"; + HLCSegmentName hlc = new HLCSegmentName(groupId, partitionRange, sequenceNumber); + assertFalse(mgr.allowDownload(hlc.getSegmentName(), null)); + + LLCSegmentName llc = new LLCSegmentName("tbl01", 0, 1000000, System.currentTimeMillis()); + SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class); + when(zkmd.getStatus()).thenReturn(Status.IN_PROGRESS); + assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd)); + + when(zkmd.getStatus()).thenReturn(Status.DONE); + when(zkmd.getDownloadUrl()).thenReturn(""); + assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd)); + + when(zkmd.getDownloadUrl()).thenReturn("remote"); + assertTrue(mgr.allowDownload(llc.getSegmentName(), zkmd)); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 4078bda..36634db 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -27,11 +27,14 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.Pair; @@ -79,6 +82,27 @@ public interface TableDataManager { throws Exception; /** + * Reloads an existing immutable segment for the table, which can be an OFFLINE or REALTIME table. + * A new segment may be downloaded if the local one has a different CRC; or can be forced to download + * if forceDownload flag is true. This operation is conducted within a failure handling framework + * and made transparent to ongoing queries, because the segment is in online serving state. + */ + void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata, + SegmentMetadata localMetadata, @Nullable Schema schema, boolean forceDownload) + throws Exception; + + /** + * Adds or replaces an immutable segment for the table, which can be an OFFLINE or REALTIME table. + * A new segment may be downloaded if the local one has a different CRC or doesn't work as expected. + * This operation is conducted outside the failure handling framework as used in segment reloading, + * because the segment is not yet online serving queries, e.g. this method is used to add a new segment, + * or transition a segment to online serving state. + */ + void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata, + @Nullable SegmentMetadata localMetadata) + throws Exception; + + /** * Removes a segment from the table. */ void removeSegment(String segmentName); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index d83ed50..e58d020 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -54,6 +54,7 @@ import org.apache.pinot.common.restlet.resources.SystemResourceInfo; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.ServiceStatus.Status; import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.data.manager.InstanceDataManager; @@ -70,6 +71,7 @@ import org.apache.pinot.server.realtime.ControllerLeaderLocator; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.server.starter.ServerInstance; import org.apache.pinot.server.starter.ServerQueriesDisabledTracker; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProvider; import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProviderFactory; @@ -375,10 +377,9 @@ public abstract class BaseServerStarter implements ServiceStartable { _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); - SegmentFetcherAndLoader fetcherAndLoader = - new SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics); + initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader); + new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); _helixManager.getStateMachineEngine() .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); // Start the server instance as a pre-connect callback so that it starts after connecting to the ZK in order to @@ -448,7 +449,7 @@ public abstract class BaseServerStarter implements ServiceStartable { // Register message handler factory SegmentMessageHandlerFactory messageHandlerFactory = - new SegmentMessageHandlerFactory(fetcherAndLoader, instanceDataManager, serverMetrics); + new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics); _helixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory); @@ -708,4 +709,23 @@ public abstract class BaseServerStarter implements ServiceStartable { instanceConfig.getRecord().setMapField(Helix.Instance.SYSTEM_RESOURCE_INFO_KEY, systemResourceMap); helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig); } + + /** + * Initialize the components to download segments from deep store. They used to be + * initialized in SegmentFetcherAndLoader, which has been removed to consolidate + * segment download functionality for both Offline and Realtime tables. So those + * components are initialized where SegmentFetcherAndLoader was initialized. + */ + private void initSegmentFetcher(PinotConfiguration config) + throws Exception { + PinotConfiguration pinotFSConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY); + PinotFSFactory.init(pinotFSConfig); + + PinotConfiguration segmentFetcherFactoryConfig = + config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY); + SegmentFetcherFactory.init(segmentFetcherFactoryConfig); + + PinotConfiguration pinotCrypterConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER); + PinotCrypterFactory.init(pinotCrypterConfig); + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 78aee36..3cf0bbb 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -32,11 +32,11 @@ import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.core.data.manager.InstanceDataManager; @@ -47,11 +47,8 @@ import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; -import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -64,8 +61,6 @@ import org.slf4j.LoggerFactory; /** * The class <code>HelixInstanceDataManager</code> is the instance data manager based on Helix. - * - * TODO: move SegmentFetcherAndLoader into this class to make this the top level manager */ @ThreadSafe public class HelixInstanceDataManager implements InstanceDataManager { @@ -192,7 +187,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override - public void reloadSegment(String tableNameWithType, String segmentName) + public void reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) throws Exception { LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType); SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType, segmentName); @@ -206,13 +201,13 @@ public class HelixInstanceDataManager implements InstanceDataManager { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); - reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema); + reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload); LOGGER.info("Reloaded single segment: {} in table: {}", segmentName, tableNameWithType); } @Override - public void reloadAllSegments(String tableNameWithType) + public void reloadAllSegments(String tableNameWithType, boolean forceDownload) throws Exception { LOGGER.info("Reloading all segments in table: {}", tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); @@ -221,17 +216,18 @@ public class HelixInstanceDataManager implements InstanceDataManager { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); for (SegmentMetadata segmentMetadata : getAllSegmentsMetadata(tableNameWithType)) { - reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema); + reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload); } LOGGER.info("Reloaded all segments in table: {}", tableNameWithType); } private void reloadSegment(String tableNameWithType, SegmentMetadata segmentMetadata, TableConfig tableConfig, - @Nullable Schema schema) + @Nullable Schema schema, boolean forceDownload) throws Exception { String segmentName = segmentMetadata.getName(); - LOGGER.info("Reloading segment: {} in table: {}", segmentName, tableNameWithType); + LOGGER.info("Reloading segment: {} in table: {} with forceDownload: {}", segmentName, tableNameWithType, + forceDownload); TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); if (tableDataManager == null) { @@ -259,53 +255,49 @@ public class HelixInstanceDataManager implements InstanceDataManager { return; } - Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir); - File parentFile = indexDir.getParentFile(); - File segmentBackupDir = - new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX); + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); + Preconditions.checkNotNull(zkMetadata); // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); try { segmentLock.lock(); - // First rename index directory to segment backup directory so that original segment have all file descriptors - // point to the segment backup directory to ensure original segment serves queries properly - - // Rename index directory to segment backup directory (atomic) - Preconditions.checkState(indexDir.renameTo(segmentBackupDir), - "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir); + // Reloads an existing segment, and the local segment metadata is existing as asserted above. + tableDataManager.reloadSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), + zkMetadata, segmentMetadata, schema, forceDownload); + LOGGER.info("Reloaded segment: {} of table: {}", segmentName, tableNameWithType); + } finally { + segmentLock.unlock(); + } + } - // Copy from segment backup directory back to index directory - FileUtils.copyDirectory(segmentBackupDir, indexDir); + @Override + public void addOrReplaceSegment(String tableNameWithType, String segmentName) + throws Exception { + LOGGER.info("Adding or replacing segment: {} for table: {}", segmentName, tableNameWithType); - // Load from index directory - ImmutableSegment immutableSegment = ImmutableSegmentLoader - .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), schema); + // Get updated table config and segment metadata from Zookeeper. + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkNotNull(tableConfig); + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); + Preconditions.checkNotNull(zkMetadata); - // Replace the old segment in memory - tableDataManager.addSegment(immutableSegment); + // This method might modify the file on disk. Use segment lock to prevent race condition + Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); + try { + segmentLock.lock(); - // Rename segment backup directory to segment temporary directory (atomic) - // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we - // rename the segment backup directory to segment temporary directory, we know the reload already succeeded, so - // that we can safely delete the segment temporary directory - File segmentTempDir = new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX); - Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir), - "Failed to rename segment backup directory: %s to segment temporary directory: %s", segmentBackupDir, - segmentTempDir); - LOGGER.info("Reloaded segment: {} in table: {}", segmentName, tableNameWithType); + // But if table mgr is not created or the segment is not loaded yet, the localMetadata + // is set to null. Then, addOrReplaceSegment method will load the segment accordingly. + SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType, segmentName); - // Delete segment temporary directory - FileUtils.deleteDirectory(segmentTempDir); - } catch (Exception reloadFailureException) { - try { - LoaderUtils.reloadFailureRecovery(indexDir); - } catch (Exception recoveryFailureException) { - LOGGER.error("Failed to recover after reload failure", recoveryFailureException); - reloadFailureException.addSuppressed(recoveryFailureException); - } - throw reloadFailureException; + _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> createTableDataManager(k, tableConfig)) + .addOrReplaceSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), + zkMetadata, localMetadata); + LOGGER.info("Added or replaced segment: {} of table: {}", segmentName, tableNameWithType); } finally { segmentLock.unlock(); } @@ -361,9 +353,13 @@ public class HelixInstanceDataManager implements InstanceDataManager { } } + /** + * Assemble the path to segment dir directly, when table mgr object is not + * created for the given table yet. + */ @Override - public String getSegmentDataDirectory() { - return _instanceDataManagerConfig.getInstanceDataDir(); + public File getSegmentDataDirectory(String tableNameWithType, String segmentName) { + return new File(new File(_instanceDataManagerConfig.getInstanceDataDir(), tableNameWithType), segmentName); } @Override diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java deleted file mode 100644 index 9a4ede4..0000000 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.server.starter.helix; - -import com.google.common.base.Preconditions; -import java.io.File; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.locks.Lock; -import javax.annotation.Nullable; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.Utils; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.metrics.ServerMeter; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; -import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.crypt.PinotCrypter; -import org.apache.pinot.spi.crypt.PinotCrypterFactory; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.filesystem.PinotFSFactory; -import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.retry.AttemptsExceededException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class SegmentFetcherAndLoader { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentFetcherAndLoader.class); - - private static final String TAR_GZ_SUFFIX = ".tar.gz"; - private static final String ENCODED_SUFFIX = ".enc"; - - private final InstanceDataManager _instanceDataManager; - private final ServerMetrics _serverMetrics; - - public SegmentFetcherAndLoader(PinotConfiguration config, InstanceDataManager instanceDataManager, - ServerMetrics serverMetrics) - throws Exception { - _instanceDataManager = instanceDataManager; - _serverMetrics = serverMetrics; - - PinotConfiguration pinotFSConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY); - PinotConfiguration segmentFetcherFactoryConfig = - config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY); - PinotConfiguration pinotCrypterConfig = config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER); - - PinotFSFactory.init(pinotFSConfig); - SegmentFetcherFactory.init(segmentFetcherFactoryConfig); - PinotCrypterFactory.init(pinotCrypterConfig); - } - - public void replaceAllOfflineSegments(String tableNameWithType) { - LOGGER.info("Replacing all segments in table: {}", tableNameWithType); - List<SegmentMetadata> segMds = _instanceDataManager.getAllSegmentsMetadata(tableNameWithType); - for (SegmentMetadata segMd : segMds) { - addOrReplaceOfflineSegment(tableNameWithType, segMd.getName(), true); - } - LOGGER.info("Replaced all segments in table: {}", tableNameWithType); - } - - public void addOrReplaceOfflineSegment(String tableNameWithType, String segmentName) { - addOrReplaceOfflineSegment(tableNameWithType, segmentName, false); - } - - /** - * Add a new segment or replace an existing segment for offline table. The method checks - * the local segment CRC with the one set in ZK by controller. If both equal, the method - * simply loads the local segment, otherwise it downloads the new segment then load. When - * forceDownload is set to true, the server always downloads the segment. - */ - public void addOrReplaceOfflineSegment(String tableNameWithType, String segmentName, boolean forceDownload) { - SegmentZKMetadata newSegmentZKMetadata = ZKMetadataProvider - .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), tableNameWithType, segmentName); - Preconditions.checkNotNull(newSegmentZKMetadata); - - LOGGER.info("Adding or replacing segment {} for table {}, metadata {}, downloadIsForced {}", segmentName, - tableNameWithType, newSegmentZKMetadata, forceDownload); - - // This method might modify the file on disk. Use segment lock to prevent race condition - Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); - try { - segmentLock.lock(); - - // We lock the segment in order to get its metadata, and then release the lock, so it is possible - // that the segment is dropped after we get its metadata. - SegmentMetadata localSegmentMetadata = _instanceDataManager.getSegmentMetadata(tableNameWithType, segmentName); - - if (localSegmentMetadata == null) { - LOGGER.info("Segment {} of table {} is not loaded in memory, checking disk", segmentName, tableNameWithType); - File indexDir = new File(getSegmentLocalDirectory(tableNameWithType, segmentName)); - // Restart during segment reload might leave segment in inconsistent state (index directory might not exist but - // segment backup directory existed), need to first try to recover from reload failure before checking the - // existence of the index directory and loading segment metadata from it - LoaderUtils.reloadFailureRecovery(indexDir); - if (indexDir.exists()) { - LOGGER.info("Segment {} of table {} found on disk, attempting to load it", segmentName, tableNameWithType); - try { - localSegmentMetadata = new SegmentMetadataImpl(indexDir); - LOGGER.info("Found segment {} of table {} with crc {} on disk", segmentName, tableNameWithType, - localSegmentMetadata.getCrc()); - } catch (Exception e) { - // The localSegmentDir should help us get the table name, - LOGGER.error("Failed to load segment metadata from {}. Deleting it.", indexDir, e); - FileUtils.deleteQuietly(indexDir); - localSegmentMetadata = null; - } - try { - if (!forceDownload && !isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, - localSegmentMetadata)) { - LOGGER.info("Segment metadata same as before, loading {} of table {} (crc {}) from disk", segmentName, - tableNameWithType, localSegmentMetadata.getCrc()); - _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, indexDir); - // TODO Update zk metadata with CRC for this instance - return; - } - } catch (Exception e) { - LOGGER - .error("Failed to load {} of table {} from local, will try to reload it from controller!", segmentName, - tableNameWithType, e); - FileUtils.deleteQuietly(indexDir); - localSegmentMetadata = null; - } - } - } - // There is a very unlikely race condition that we may have gotten the metadata of a - // segment that was not dropped when we checked, but was dropped after the check above. - // That is possible only if we get two helix transitions (to drop, and then to add back) the - // segment at the same, or very close to each other.If the race condition triggers, and the - // two segments are same in metadata, then we may end up NOT adding back the segment - // that is in the process of being dropped. - - // If we get here, then either it is the case that we have the segment loaded in memory (and therefore present - // in disk) or, we need to load from the server. In the former case, we still need to check if the metadata - // that we have is different from that in zookeeper. - if (forceDownload || isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) { - if (forceDownload) { - LOGGER.info("Force to download segment {} of table {} from controller.", segmentName, tableNameWithType); - } else if (localSegmentMetadata == null) { - LOGGER.info("Loading new segment {} of table {} from controller", segmentName, tableNameWithType); - } else { - LOGGER.info("Trying to refresh segment {} of table {} with new data.", segmentName, tableNameWithType); - } - String uri = newSegmentZKMetadata.getDownloadUrl(); - String crypterName = newSegmentZKMetadata.getCrypterName(); - PinotCrypter crypter = (crypterName != null) ? PinotCrypterFactory.create(crypterName) : null; - - // Retry will be done here. - String localSegmentDir = downloadSegmentToLocal(uri, crypter, tableNameWithType, segmentName); - SegmentMetadata segmentMetadata = new SegmentMetadataImpl(new File(localSegmentDir)); - _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, new File(localSegmentDir)); - LOGGER.info("Downloaded segment {} of table {} crc {} from controller", segmentName, tableNameWithType, - segmentMetadata.getCrc()); - } else { - LOGGER.info("Got already loaded segment {} of table {} crc {} again, will do nothing.", segmentName, - tableNameWithType, localSegmentMetadata.getCrc()); - } - } catch (final Exception e) { - LOGGER.error("Cannot load segment : " + segmentName + " for table " + tableNameWithType, e); - Utils.rethrowException(e); - throw new AssertionError("Should not reach this"); - } finally { - segmentLock.unlock(); - } - } - - private boolean isNewSegmentMetadata(String tableNameWithType, SegmentZKMetadata newSegmentZKMetadata, - @Nullable SegmentMetadata existedSegmentMetadata) { - String segmentName = newSegmentZKMetadata.getSegmentName(); - - if (existedSegmentMetadata == null) { - LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, tableNameWithType); - return true; - } - - long newCrc = newSegmentZKMetadata.getCrc(); - long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc()); - LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {} in table: {}", newCrc, existedCrc, - segmentName, tableNameWithType); - return newCrc != existedCrc; - } - - private String downloadSegmentToLocal(String uri, PinotCrypter crypter, String tableName, String segmentName) - throws Exception { - File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName), - "tmp-" + segmentName + "-" + UUID.randomUUID()); - FileUtils.forceMkdir(tempDir); - File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX); - File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX); - File tempSegmentDir = new File(tempDir, segmentName); - try { - try { - SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile); - if (crypter != null) { - crypter.decrypt(tempDownloadFile, tempTarFile); - } else { - tempTarFile = tempDownloadFile; - } - LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, - tableName, uri, tempTarFile, tempTarFile.length()); - } catch (AttemptsExceededException e) { - LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName, - tableName, uri, tempTarFile); - _serverMetrics.addMeteredTableValue(tableName, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); - Utils.rethrowException(e); - return null; - } - - try { - // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry. - // Thus, there's no need to retry again. - File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0); - File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName); - if (indexDir.exists()) { - LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName); - FileUtils.deleteDirectory(indexDir); - } - FileUtils.moveDirectory(tempIndexDir, indexDir); - LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir); - return indexDir.getAbsolutePath(); - } catch (Exception e) { - LOGGER.error("Exception when untarring segment: {} for table: {} from {} to {}", segmentName, tableName, - tempTarFile, tempSegmentDir); - _serverMetrics.addMeteredTableValue(tableName, ServerMeter.UNTAR_FAILURES, 1L); - Utils.rethrowException(e); - return null; - } - } finally { - FileUtils.deleteQuietly(tempDir); - } - } - - public String getSegmentLocalDirectory(String tableName, String segmentId) { - return _instanceDataManager.getSegmentDataDirectory() + "/" + tableName + "/" + segmentId; - } -} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index 97f1d23..ad4a386 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -40,14 +40,10 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { // We only allow limited number of segments refresh/reload happen at the same time // The reason for that is segment refresh/reload will temporarily use double-sized memory private final Semaphore _refreshThreadSemaphore; - - private final SegmentFetcherAndLoader _fetcherAndLoader; private final InstanceDataManager _instanceDataManager; private final ServerMetrics _metrics; - public SegmentMessageHandlerFactory(SegmentFetcherAndLoader fetcherAndLoader, InstanceDataManager instanceDataManager, - ServerMetrics metrics) { - _fetcherAndLoader = fetcherAndLoader; + public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, ServerMetrics metrics) { _instanceDataManager = instanceDataManager; _metrics = metrics; int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads(); @@ -119,7 +115,7 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { try { acquireSema(_segmentName, _logger); // The number of retry times depends on the retry count in Constants. - _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, _segmentName); + _instanceDataManager.addOrReplaceSegment(_tableNameWithType, _segmentName); result.setSuccess(true); } catch (Exception e) { _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REFRESH_FAILURES, 1); @@ -150,19 +146,11 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { acquireSema("ALL", _logger); // NOTE: the method aborts if any segment reload encounters an unhandled exception, // and can lead to inconsistent state across segments - if (_forceDownload) { - _fetcherAndLoader.replaceAllOfflineSegments(_tableNameWithType); - } else { - _instanceDataManager.reloadAllSegments(_tableNameWithType); - } + _instanceDataManager.reloadAllSegments(_tableNameWithType, _forceDownload); } else { // Reload one segment acquireSema(_segmentName, _logger); - if (_forceDownload) { - _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, _segmentName, true); - } else { - _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName); - } + _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, _forceDownload); } helixTaskResult.setSuccess(true); } catch (Throwable e) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 1a5efe5..d2e4c30 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -53,13 +53,10 @@ import org.slf4j.LoggerFactory; public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { private final String _instanceId; private final InstanceDataManager _instanceDataManager; - private final SegmentFetcherAndLoader _fetcherAndLoader; - public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager, - SegmentFetcherAndLoader fetcherAndLoader) { + public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) { _instanceId = instanceId; _instanceDataManager = instanceDataManager; - _fetcherAndLoader = fetcherAndLoader; } public static String getStateModelName() { @@ -162,7 +159,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta TableType tableType = TableNameBuilder.getTableTypeFromTableName(message.getResourceName()); Preconditions.checkNotNull(tableType); if (tableType == TableType.OFFLINE) { - _fetcherAndLoader.addOrReplaceOfflineSegment(tableNameWithType, segmentName); + _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); } else { _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName); } @@ -208,7 +205,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta try { segmentLock.lock(); - final File segmentDir = new File(_fetcherAndLoader.getSegmentLocalDirectory(tableNameWithType, segmentName)); + File segmentDir = _instanceDataManager.getSegmentDataDirectory(tableNameWithType, segmentName); if (segmentDir.exists()) { FileUtils.deleteQuietly(segmentDir); _logger.info("Deleted segment directory {}", segmentDir); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 2bc0de8..642fa04 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -453,9 +453,11 @@ public class CommonConstants { public static class Realtime { public enum Status { // Means the segment is in CONSUMING state. - IN_PROGRESS, // Means the segment is in ONLINE state (segment completed consuming and has been saved in + IN_PROGRESS, + // Means the segment is in ONLINE state (segment completed consuming and has been saved in // segment store). - DONE, // Means the segment is uploaded to a Pinot controller by an external party. + DONE, + // Means the segment is uploaded to a Pinot controller by an external party. UPLOADED } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org