This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new decc4eba2f Properly handle shutdown of TableDataManager (#11380) decc4eba2f is described below commit decc4eba2f5f1f4e13ee5f97a62d51518063a7de Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Aug 18 18:39:54 2023 -0700 Properly handle shutdown of TableDataManager (#11380) --- .../helix/core/PinotHelixResourceManager.java | 170 ++++++--------------- .../core/data/manager/BaseTableDataManager.java | 76 ++++++--- .../core/data/manager/InstanceDataManager.java | 6 - .../manager/offline/DimensionTableDataManager.java | 10 +- .../manager/offline/OfflineTableDataManager.java | 1 + .../manager/realtime/RealtimeTableDataManager.java | 29 ++-- .../tests/BaseClusterIntegrationTest.java | 31 ++-- .../tests/BaseClusterIntegrationTestSet.java | 20 --- .../tests/BaseRealtimeClusterIntegrationTest.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 8 +- .../local/data/manager/TableDataManager.java | 4 +- .../starter/helix/HelixInstanceDataManager.java | 82 +++++----- .../SegmentOnlineOfflineStateModelFactory.java | 125 +++++++++------ 13 files changed, 266 insertions(+), 298 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index d584136740..1974d952b5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1984,62 +1984,7 @@ public class PinotHelixResourceManager { } public void deleteOfflineTable(String tableName, @Nullable String retentionPeriod) { - String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); - LOGGER.info("Deleting table {}: Start", offlineTableName); - - // Remove the table from brokerResource - HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, offlineTableName); - LOGGER.info("Deleting table {}: Removed from broker resource", offlineTableName); - - // Drop the table on servers - // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers - // instead of servers. This is because if externalview gets updated with significant delay, - // we may have the race condition for table recreation that the new table will use the old states - // (old table data manager) on the servers. - // Steps needed: - // 1. Drop the helix resource first (set idealstate as null) - // 2. Wait for the externalview to converge - // 3. Get servers for the tenant, and send delete table message to these servers - deleteTableOnServer(offlineTableName); - - // Drop the table - if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName)) { - _helixAdmin.dropResource(_helixClusterName, offlineTableName); - LOGGER.info("Deleting table {}: Removed helix table resource", offlineTableName); - } - - // Remove all stored segments for the table - Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName), - retentionPeriodMs); - LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName); - - // Remove segment metadata - ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed segment metadata", offlineTableName); - - // Remove instance partitions - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed instance partitions", offlineTableName); - - // Remove tier instance partitions - InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed tier instance partitions", offlineTableName); - - // Remove segment lineage - SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed segment lineage", offlineTableName); - - // Remove task related metadata - MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed all minion task metadata", offlineTableName); - - // Remove table config - // this should always be the last step for deletion to avoid race condition in table re-create. - ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed table config", offlineTableName); - - LOGGER.info("Deleting table {}: Finish", offlineTableName); + deleteTable(tableName, TableType.OFFLINE, retentionPeriod); } public void deleteRealtimeTable(String tableName) { @@ -2047,73 +1992,64 @@ public class PinotHelixResourceManager { } public void deleteRealtimeTable(String tableName, @Nullable String retentionPeriod) { - String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); - LOGGER.info("Deleting table {}: Start", realtimeTableName); + deleteTable(tableName, TableType.REALTIME, retentionPeriod); + } + + public void deleteTable(String tableName, TableType tableType, @Nullable String retentionPeriod) { + String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName); + LOGGER.info("Deleting table {}: Start", tableNameWithType); // Remove the table from brokerResource - HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, realtimeTableName); - LOGGER.info("Deleting table {}: Removed from broker resource", realtimeTableName); + HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, tableNameWithType); + LOGGER.info("Deleting table {}: Removed from broker resource", tableNameWithType); - // Drop the table on servers - // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers - // instead of servers. Follow the same steps for offline tables. - deleteTableOnServer(realtimeTableName); + // Delete the table on servers + deleteTableOnServers(tableNameWithType); - // Cache the state and drop the table - Set<String> instancesForTable = null; - if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName)) { - instancesForTable = getAllInstancesForTable(realtimeTableName); - _helixAdmin.dropResource(_helixClusterName, realtimeTableName); - LOGGER.info("Deleting table {}: Removed helix table resource", realtimeTableName); - } + // Remove ideal state + _helixDataAccessor.removeProperty(_keyBuilder.idealStates(tableNameWithType)); + LOGGER.info("Deleting table {}: Removed ideal state", tableNameWithType); // Remove all stored segments for the table Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null; - _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName), + _segmentDeletionManager.removeSegmentsFromStore(tableNameWithType, getSegmentsFromPropertyStore(tableNameWithType), retentionPeriodMs); - LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName); + LOGGER.info("Deleting table {}: Removed stored segments", tableNameWithType); // Remove segment metadata - ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, realtimeTableName); - LOGGER.info("Deleting table {}: Removed segment metadata", realtimeTableName); + ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore, tableNameWithType); + LOGGER.info("Deleting table {}: Removed segment metadata", tableNameWithType); // Remove instance partitions - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, - InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, - InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); - LOGGER.info("Deleting table {}: Removed instance partitions", realtimeTableName); + if (tableType == TableType.OFFLINE) { + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, tableNameWithType); + } else { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); + } + LOGGER.info("Deleting table {}: Removed instance partitions", tableNameWithType); - InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, rawTableName); - LOGGER.info("Deleting table {}: Removed tier instance partitions", realtimeTableName); + // Remove tier instance partitions + InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, tableNameWithType); + LOGGER.info("Deleting table {}: Removed tier instance partitions", tableNameWithType); // Remove segment lineage - SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, realtimeTableName); - LOGGER.info("Deleting table {}: Removed segment lineage", realtimeTableName); + SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, tableNameWithType); + LOGGER.info("Deleting table {}: Removed segment lineage", tableNameWithType); // Remove task related metadata - MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, realtimeTableName); - LOGGER.info("Deleting table {}: Removed all minion task metadata", realtimeTableName); - - // Remove groupId/partitionId mapping for HLC table - if (instancesForTable != null) { - for (String instance : instancesForTable) { - InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, instance); - if (instanceZKMetadata != null) { - instanceZKMetadata.removeResource(realtimeTableName); - ZKMetadataProvider.setInstanceZKMetadata(_propertyStore, instanceZKMetadata); - } - } - } - LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for HLC table", realtimeTableName); + MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore, tableNameWithType); + LOGGER.info("Deleting table {}: Removed all minion task metadata", tableNameWithType); // Remove table config - // this should always be the last step for deletion to avoid race condition in table re-create. - ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, realtimeTableName); - LOGGER.info("Deleting table {}: Removed table config", realtimeTableName); + // NOTE: This should always be the last step for deletion to avoid race condition in table re-create + ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, tableNameWithType); + LOGGER.info("Deleting table {}: Removed table config", tableNameWithType); - LOGGER.info("Deleting table {}: Finish", realtimeTableName); + LOGGER.info("Deleting table {}: Finish", tableNameWithType); } /** @@ -2152,15 +2088,6 @@ public class PinotHelixResourceManager { } } - private Set<String> getAllInstancesForTable(String tableNameWithType) { - Set<String> instanceSet = new HashSet<>(); - IdealState tableIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); - for (String partition : tableIdealState.getPartitionSet()) { - instanceSet.addAll(tableIdealState.getInstanceSet(partition)); - } - return instanceSet; - } - /** * Returns the ZK metdata for the given jobId and jobType * @param jobId the id of the job @@ -2443,10 +2370,16 @@ public class PinotHelixResourceManager { } /** - * Delete the table on servers by sending table deletion message + * Delete the table on servers by sending table deletion messages. */ - private void deleteTableOnServer(String tableNameWithType) { - LOGGER.info("Sending delete table message for table: {}", tableNameWithType); + private void deleteTableOnServers(String tableNameWithType) { + // External view can be null for newly created table, skip sending messages + if (_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) == null) { + LOGGER.warn("No delete table message sent for newly created table: {} without external view", tableNameWithType); + return; + } + + LOGGER.info("Sending delete table messages for table: {}", tableNameWithType); Criteria recipientCriteria = new Criteria(); recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); recipientCriteria.setInstanceName("%"); @@ -2455,13 +2388,6 @@ public class PinotHelixResourceManager { TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - // Externalview can be null for newly created table, skip sending the message - if (_helixZkManager.getHelixDataAccessor() - .getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)) == null) { - LOGGER.warn("No delete table message sent for newly created table: {} as the externalview is null.", - tableNameWithType); - return; - } // Infinite timeout on the recipient int timeoutMs = -1; int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 0187fb6836..0fb1b26dc3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -179,7 +180,7 @@ public abstract class BaseTableDataManager implements TableDataManager { protected abstract void doInit(); @Override - public void start() { + public synchronized void start() { _logger.info("Starting table data manager for table: {}", _tableNameWithType); doStart(); _logger.info("Started table data manager for table: {}", _tableNameWithType); @@ -188,7 +189,11 @@ public abstract class BaseTableDataManager implements TableDataManager { protected abstract void doStart(); @Override - public void shutDown() { + public synchronized void shutDown() { + if (_shutDown) { + _logger.info("Table data manager for table: {} is already shut down", _tableNameWithType); + return; + } _logger.info("Shutting down table data manager for table: {}", _tableNameWithType); _shutDown = true; doShutdown(); @@ -197,6 +202,18 @@ public abstract class BaseTableDataManager implements TableDataManager { protected abstract void doShutdown(); + /** + * Releases and removes all segments tracked by the table data manager. + */ + protected void releaseAndRemoveAllSegments() { + Iterator<SegmentDataManager> iterator = _segmentDataManagerMap.values().iterator(); + while (iterator.hasNext()) { + SegmentDataManager segmentDataManager = iterator.next(); + iterator.remove(); + releaseSegment(segmentDataManager); + } + } + @Override public boolean isShutDown() { return _shutDown; @@ -214,6 +231,8 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void addSegment(ImmutableSegment immutableSegment) { String segmentName = immutableSegment.getSegmentName(); + Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", + segmentName, _tableNameWithType); _logger.info("Adding immutable segment: {} to table: {}", segmentName, _tableNameWithType); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, immutableSegment.getSegmentMetadata().getTotalDocs()); @@ -232,6 +251,8 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig) throws Exception { + Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", + indexDir.getName(), _tableNameWithType); indexLoadingConfig.setTableDataDir(_tableDataDir); indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs()); addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema())); @@ -251,6 +272,12 @@ public abstract class BaseTableDataManager implements TableDataManager { */ @Override public void removeSegment(String segmentName) { + // Allow removing segment after shutdown so that we can remove the segment when the table is deleted + if (_shutDown) { + _logger.info("Table data manager is already shut down, skip removing segment: {} from table: {}", segmentName, + _tableNameWithType); + return; + } _logger.info("Removing segment: {} from table: {}", segmentName, _tableNameWithType); SegmentDataManager segmentDataManager = unregisterSegment(segmentName); if (segmentDataManager != null) { @@ -364,6 +391,9 @@ public abstract class BaseTableDataManager implements TableDataManager { public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata, SegmentMetadata localMetadata, @Nullable Schema schema, boolean forceDownload) throws Exception { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot reload segment: %s of table: %s", segmentName, + _tableNameWithType); String segmentTier = getSegmentCurrentTier(segmentName); indexLoadingConfig.setSegmentTier(segmentTier); indexLoadingConfig.setTableDataDir(_tableDataDir); @@ -390,8 +420,7 @@ public abstract class BaseTableDataManager implements TableDataManager { SegmentDirectory segmentDirectory = initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig); // We should first try to reuse existing segment directory - if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, segmentDirectory, indexLoadingConfig, - schema)) { + if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier, segmentDirectory, indexLoadingConfig, schema)) { LOGGER.info("Reloading segment: {} of table: {} using existing segment directory as no reprocessing needed", segmentName, _tableNameWithType); // No reprocessing needed, reuse the same segment @@ -432,9 +461,8 @@ public abstract class BaseTableDataManager implements TableDataManager { } } - private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata segmentZKMetadata, - String currentSegmentTier, SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig, - Schema schema) + private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata segmentZKMetadata, String currentSegmentTier, + SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig, Schema schema) throws Exception { SegmentDirectoryLoader segmentDirectoryLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader()); @@ -446,6 +474,9 @@ public abstract class BaseTableDataManager implements TableDataManager { public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata) throws Exception { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot add/replace segment: %s of table: %s", segmentName, + _tableNameWithType); if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) { LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName, _tableNameWithType, localMetadata.getCrc()); @@ -595,12 +626,13 @@ public abstract class BaseTableDataManager implements TableDataManager { } // not thread safe. Caller should invoke it with safe concurrency control. - protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, - File destTarFile) throws Exception { + protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, File destTarFile) + throws Exception { Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme() != null, - "Download peers require non null peer download scheme"); - List<URI> peerSegmentURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, - _tableDataManagerConfig.getTablePeerDownloadScheme(), _helixManager, _tableNameWithType); + "Download peers require non null peer download scheme"); + List<URI> peerSegmentURIs = + PeerServerSegmentFinder.getPeerServerURIs(segmentName, _tableDataManagerConfig.getTablePeerDownloadScheme(), + _helixManager, _tableNameWithType); if (peerSegmentURIs.isEmpty()) { String msg = String.format("segment %s doesn't have any peers", segmentName); LOGGER.warn(msg); @@ -611,10 +643,10 @@ public abstract class BaseTableDataManager implements TableDataManager { // Next download the segment from a randomly chosen server using configured scheme. SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(peerSegmentURIs, destTarFile, zkMetadata.getCrypterName()); LOGGER.info("Fetched segment {} from peers: {} to: {} of size: {}", segmentName, peerSegmentURIs, destTarFile, - destTarFile.length()); + destTarFile.length()); } catch (AttemptsExceededException e) { LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from peers {} to: {}", segmentName, - _tableNameWithType, peerSegmentURIs, destTarFile); + _tableNameWithType, peerSegmentURIs, destTarFile); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1L); throw e; } @@ -636,12 +668,12 @@ public abstract class BaseTableDataManager implements TableDataManager { String uri = zkMetadata.getDownloadUrl(); AtomicInteger attempts = new AtomicInteger(0); try { - File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri, tempRootDir, maxStreamRateInByte, attempts); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, - attempts.get()); - LOGGER.info("Downloaded and untarred segment: {} for table: {} from: {} attempts: {}", segmentName, - _tableNameWithType, uri, attempts.get()); - return ret; + File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri, tempRootDir, maxStreamRateInByte, attempts); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, + attempts.get()); + LOGGER.info("Downloaded and untarred segment: {} for table: {} from: {} attempts: {}", segmentName, + _tableNameWithType, uri, attempts.get()); + return ret; } catch (Exception e) { _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES, attempts.get()); @@ -771,6 +803,10 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata) { + Preconditions.checkState(!_shutDown, + "Table data manager is already shut down, cannot load existing segment: %s of table: %s", segmentName, + _tableNameWithType); + // Try to recover the segment from potential segment reloading failure. String segmentTier = zkMetadata.getTier(); File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index bbc392d4d0..ffb5923411 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -74,12 +74,6 @@ public interface InstanceDataManager { void deleteTable(String tableNameWithType) throws Exception; - /** - * Adds a segment from local disk into an OFFLINE table. - */ - void addOfflineSegment(String offlineTableName, String segmentName, File indexDir) - throws Exception; - /** * Adds a segment into an REALTIME table. * <p>The segment might be committed or under consuming. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index a9635f2a66..b78874a8e3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -117,8 +117,10 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override public void addSegment(ImmutableSegment immutableSegment) { - super.addSegment(immutableSegment); String segmentName = immutableSegment.getSegmentName(); + Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", + segmentName, _tableNameWithType); + super.addSegment(immutableSegment); try { if (loadLookupTable()) { _logger.info("Successfully loaded lookup table after adding segment: {}", segmentName); @@ -134,6 +136,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override public void removeSegment(String segmentName) { + // Allow removing segment after shutdown so that we can remove the segment when the table is deleted + if (_shutDown) { + _logger.info("Table data manager is already shut down, skip removing segment: {}", segmentName); + return; + } super.removeSegment(segmentName); try { if (loadLookupTable()) { @@ -150,6 +157,7 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override protected void doShutdown() { + releaseAndRemoveAllSegments(); closeDimensionTable(_dimensionTable.get()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java index 7e17da42cb..aef0d80b9b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java @@ -38,5 +38,6 @@ public class OfflineTableDataManager extends BaseTableDataManager { @Override protected void doShutdown() { + releaseAndRemoveAllSegments(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 3bf7caf24b..ebd5aee4f2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -54,7 +54,6 @@ import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager; import org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; @@ -251,18 +250,14 @@ public class RealtimeTableDataManager extends BaseTableDataManager { if (_tableUpsertMetadataManager != null) { // Stop the upsert metadata manager first to prevent removing metadata when destroying segments _tableUpsertMetadataManager.stop(); - for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) { - segmentDataManager.destroy(); - } + releaseAndRemoveAllSegments(); try { _tableUpsertMetadataManager.close(); } catch (IOException e) { _logger.warn("Cannot close upsert metadata manager properly for table: {}", _tableNameWithType, e); } } else { - for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) { - segmentDataManager.destroy(); - } + releaseAndRemoveAllSegments(); } if (_leaseExtender != null) { _leaseExtender.shutDown(); @@ -382,6 +377,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata segmentZKMetadata) throws Exception { + Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", + segmentName, _tableNameWithType); SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null) { _logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", segmentName, @@ -438,8 +435,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { int partitionGroupId = llcSegmentName.getPartitionGroupId(); Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new Semaphore(1)); PartitionUpsertMetadataManager partitionUpsertMetadataManager = - _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager( - partitionGroupId) : null; + _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId) + : null; PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; @@ -488,6 +485,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override public void addSegment(ImmutableSegment immutableSegment) { + String segmentName = immutableSegment.getSegmentName(); + Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", + segmentName, _tableNameWithType); if (isUpsertEnabled()) { handleUpsert(immutableSegment); return; @@ -648,17 +648,6 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } } - /** - * Replaces a committed HLC REALTIME segment. - */ - public void replaceHLSegment(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) - throws Exception { - ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentZKMetadata); - File indexDir = new File(_indexDir, segmentZKMetadata.getSegmentName()); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema)); - } - /** * Replaces a committed LLC REALTIME segment. */ diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index da466e4e93..408558d7e5 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; +import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.FieldConfig; @@ -591,8 +592,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { */ protected List<File> unpackTarData(String tarFileName, File outputDir) throws Exception { - InputStream inputStream = - BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName); + InputStream inputStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName); Assert.assertNotNull(inputStream); return TarGzCompressionUtils.untar(inputStream, outputDir); } @@ -618,9 +618,8 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { String kafkaBroker = "localhost:" + getKafkaPort(); StreamDataProducer producer = null; try { - producer = - StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, - getDefaultKafkaProducerProperties(kafkaBroker)); + producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, + getDefaultKafkaProducerProperties(kafkaBroker)); ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(), producer); } catch (Exception e) { @@ -635,9 +634,8 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { String kafkaBroker = "localhost:" + getKafkaPort(); StreamDataProducer producer = null; try { - producer = - StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, - getDefaultKafkaProducerProperties(kafkaBroker)); + producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, + getDefaultKafkaProducerProperties(kafkaBroker)); ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones(), producer); } catch (Exception e) { @@ -734,11 +732,24 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tableName) { final long countStarResult = getCountStarResult(); - TestUtils.waitForCondition( - () -> getCurrentCountStarResult(tableName) == countStarResult, 100L, timeoutMs, + TestUtils.waitForCondition(() -> getCurrentCountStarResult(tableName) == countStarResult, 100L, timeoutMs, "Failed to load " + countStarResult + " documents", raiseError, Duration.ofMillis(timeoutMs / 10)); } + /** + * Wait for servers to remove the table data manager after the table is deleted. + */ + protected void waitForTableDataManagerRemoved(String tableNameWithType) { + TestUtils.waitForCondition(aVoid -> { + for (BaseServerStarter serverStarter : _serverStarters) { + if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType) != null) { + return false; + } + } + return true; + }, 60_000L, "Failed to remove table data manager for table: " + tableNameWithType); + } + /** * Reset table utils. */ diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index ab79381b05..e850b4ba2b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -36,7 +36,6 @@ import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; -import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -82,25 +81,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati return DEFAULT_NUM_QUERIES_TO_GENERATE; } - /** - * Test server table data manager deletion after the table is dropped - */ - protected void cleanupTestTableDataManager(String tableNameWithType) { - TestUtils.waitForCondition(aVoid -> { - try { - for (BaseServerStarter serverStarter : _serverStarters) { - if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType) - != null) { - return false; - } - } - return true; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to delete table data managers"); - } - /** * Test hard-coded queries. * @throws Exception diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java index c95d5d7f2c..83adb4dfe9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java @@ -180,7 +180,7 @@ public abstract class BaseRealtimeClusterIntegrationTest extends BaseClusterInte public void tearDown() throws Exception { dropRealtimeTable(getTableName()); - cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName())); + waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(getTableName())); stopServer(); stopBroker(); stopController(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index e1ad0880c6..0debb9d09a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -432,7 +432,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } waitForNumOfSegmentsBecomeOnline(offlineTableName, 1); dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE); - cleanupTestTableDataManager(offlineTableName); + waitForTableDataManagerRemoved(offlineTableName); } private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int numSegments) @@ -2386,15 +2386,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet for (int i = 2; i < 20; i++) { query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM mytable ", i); assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 2]); - assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), - expectedResults[i - 2]); + assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[i - 2]); } // Default HLL is set as log2m=12 query = "SELECT distinctCountHLL(FlightNum) FROM mytable "; assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[10]); - assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), - expectedResults[10]); + assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(), expectedResults[10]); } @Test diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 45965f4bc4..2de9c586c0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -61,8 +61,8 @@ public interface TableDataManager { void start(); /** - * Shuts down the table data manager. Should be called only once. After calling shut down, no other method should be - * called. + * Shuts down the table data manager. After calling shut down, no other method should be called. + * NOTE: Shut down might be called multiple times. The implementation should be able to handle that. */ void shutDown(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index bdc4baf58d..d5c0c8afd8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Supplier; @@ -42,7 +41,9 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; import org.apache.helix.model.ExternalView; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -213,26 +214,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { LOGGER.info("Helix instance data manager shut down"); } - @Override - public void addOfflineSegment(String offlineTableName, String segmentName, File indexDir) - throws Exception { - LOGGER.info("Adding segment: {} to table: {}", segmentName, offlineTableName); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", offlineTableName); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); - SegmentZKMetadata zkMetadata = - ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, offlineTableName, segmentName); - Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for offline segment: %s, table: %s", - segmentName, offlineTableName); - - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); - indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); - - _tableDataManagerMap.computeIfAbsent(offlineTableName, k -> createTableDataManager(k, tableConfig)) - .addSegment(indexDir, indexLoadingConfig); - LOGGER.info("Added segment: {} to table: {}", segmentName, offlineTableName); - } - @Override public void addRealtimeSegment(String realtimeTableName, String segmentName) throws Exception { @@ -264,28 +245,43 @@ public class HelixInstanceDataManager implements InstanceDataManager { @Override public void deleteTable(String tableNameWithType) throws Exception { - // Wait externalview to converge - long endTimeMs = System.currentTimeMillis() + _externalViewDroppedMaxWaitMs; - do { - ExternalView externalView = _helixManager.getHelixDataAccessor() - .getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)); - if (externalView == null) { - LOGGER.info("ExternalView converged for the table to delete: {}", tableNameWithType); - _tableDataManagerMap.compute(tableNameWithType, (k, v) -> { - if (v != null) { - v.shutDown(); - LOGGER.info("Removed table: {}", tableNameWithType); - } else { - LOGGER.warn("Failed to find table data manager for table: {}, skip removing the table", tableNameWithType); - } - return null; - }); - return; - } - Thread.sleep(_externalViewDroppedCheckInternalMs); - } while (System.currentTimeMillis() < endTimeMs); - throw new TimeoutException( - "Timeout while waiting for ExternalView to converge for the table to delete: " + tableNameWithType); + TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + if (tableDataManager == null) { + LOGGER.warn("Failed to find table data manager for table: {}, skip deleting the table", tableNameWithType); + return; + } + LOGGER.info("Shutting down table data manager for table: {}", tableNameWithType); + tableDataManager.shutDown(); + LOGGER.info("Finished shutting down table data manager for table: {}", tableNameWithType); + + try { + // Wait for external view to disappear or become empty before removing the table data manager. + // + // When creating the table, controller will check whether the external view exists, and allow table creation only + // if it doesn't exist. If the table is recreated just after external view disappeared, there is a small chance + // that server won't realize the external view is removed because it is recreated before the server checks it. In + // order to handle this scenario, we want to remove the table data manager when the external view exists but is + // empty. + HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor(); + PropertyKey externalViewKey = helixDataAccessor.keyBuilder().externalView(tableNameWithType); + long endTimeMs = System.currentTimeMillis() + _externalViewDroppedMaxWaitMs; + do { + ExternalView externalView = helixDataAccessor.getProperty(externalViewKey); + if (externalView == null) { + LOGGER.info("ExternalView is dropped for table: {}", tableNameWithType); + return; + } + if (externalView.getRecord().getMapFields().isEmpty()) { + LOGGER.info("ExternalView is empty for table: {}", tableNameWithType); + return; + } + Thread.sleep(_externalViewDroppedCheckInternalMs); + } while (System.currentTimeMillis() < endTimeMs); + LOGGER.warn("ExternalView still exists after {}ms for table: {}", _externalViewDroppedMaxWaitMs, + tableNameWithType); + } finally { + _tableDataManagerMap.remove(tableNameWithType); + } } @Override diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index b85d13b953..510f4fa74d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -29,8 +29,6 @@ import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; @@ -75,49 +73,61 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta @Transition(from = "OFFLINE", to = "CONSUMING") public void onBecomeConsumingFromOffline(Message message, NotificationContext context) { - Preconditions.checkState(SegmentName.isLowLevelConsumerSegmentName(message.getPartitionName()), - "Tried to go into CONSUMING state on non-low level segment"); _logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message); - // We do the same processing as usual for going to the consuming state, which adds the segment to the table data - // manager and starts Kafka consumption - onBecomeOnlineFromOffline(message, context); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + if (tableDataManager != null) { + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + } + Utils.rethrowException(e); + } } @Transition(from = "CONSUMING", to = "ONLINE") public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) { + _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message); String realtimeTableName = message.getResourceName(); - String segmentNameStr = message.getPartitionName(); - LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - + String segmentName = message.getPartitionName(); TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); - Preconditions.checkNotNull(tableDataManager); - tableDataManager.onConsumingToOnline(segmentNameStr); - SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentNameStr); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToOnline(segmentName); + SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName); // For this transition to be correct in helix, we should already have a segment that is consuming - if (acquiredSegment == null) { - throw new RuntimeException("Segment " + segmentNameStr + " + not present "); - } + Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName, + realtimeTableName); // TODO: https://github.com/apache/pinot/issues/10049 try { if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) { // We found an LLC segment that is not consuming right now, must be that we already swapped it with a // segment that has been built. Nothing to do for this state transition. - _logger - .info("Segment {} not an instance of LLRealtimeSegmentDataManager. Reporting success for the transition", - acquiredSegment.getSegmentName()); + _logger.info( + "Segment {} not an instance of LLRealtimeSegmentDataManager. Reporting success for the transition", + acquiredSegment.getSegmentName()); return; } LLRealtimeSegmentDataManager segmentDataManager = (LLRealtimeSegmentDataManager) acquiredSegment; - SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider - .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName, segmentNameStr); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName, + segmentName); segmentDataManager.goOnlineFromConsuming(segmentZKMetadata); - } catch (InterruptedException e) { - String errorMessage = String.format("State transition interrupted for segment %s.", segmentNameStr); - _logger.warn(errorMessage, e); - tableDataManager - .addSegmentError(segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); - throw new RuntimeException(e); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + Utils.rethrowException(e); } finally { tableDataManager.releaseSegment(acquiredSegment); } @@ -131,7 +141,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta try { _instanceDataManager.offloadSegment(realtimeTableName, segmentName); } catch (Exception e) { - _logger.error("Caught exception in state transition from CONSUMING -> OFFLINE for resource: {}, partition: {}", + _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}", realtimeTableName, segmentName, e); Utils.rethrowException(e); } @@ -141,15 +151,16 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message); String realtimeTableName = message.getResourceName(); - String segmentNameStr = message.getPartitionName(); + String segmentName = message.getPartitionName(); TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); - Preconditions.checkNotNull(tableDataManager); - tableDataManager.onConsumingToDropped(segmentNameStr); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToDropped(segmentName); try { - onBecomeOfflineFromConsuming(message, context); - onBecomeDroppedFromOffline(message, context); - } catch (final Exception e) { - _logger.error("Caught exception on CONSUMING -> DROPPED state transition", e); + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _instanceDataManager.deleteSegment(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}", + realtimeTableName, segmentName, e); Utils.rethrowException(e); } } @@ -160,7 +171,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String tableNameWithType = message.getResourceName(); String segmentName = message.getPartitionName(); try { - TableType tableType = TableNameBuilder.getTableTypeFromTableName(message.getResourceName()); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); Preconditions.checkNotNull(tableType); if (tableType == TableType.OFFLINE) { _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); @@ -168,14 +179,14 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName); } } catch (Exception e) { - String errorMessage = String - .format("Caught exception in state transition from OFFLINE -> ONLINE for resource: %s, partition: %s", + String errorMessage = + String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s", tableNameWithType, segmentName); _logger.error(errorMessage, e); TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager != null) { - tableDataManager - .addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); } Utils.rethrowException(e); } @@ -191,7 +202,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta try { _instanceDataManager.offloadSegment(tableNameWithType, segmentName); } catch (Exception e) { - _logger.error("Caught exception in state transition from ONLINE -> OFFLINE for resource: {}, partition: {}", + _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}", tableNameWithType, segmentName, e); Utils.rethrowException(e); } @@ -205,8 +216,9 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String segmentName = message.getPartitionName(); try { _instanceDataManager.deleteSegment(tableNameWithType, segmentName); - } catch (final Exception e) { - _logger.error("Cannot drop the segment : " + segmentName + " from server!\n" + e.getMessage(), e); + } catch (Exception e) { + _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); Utils.rethrowException(e); } } @@ -214,18 +226,35 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta @Transition(from = "ONLINE", to = "DROPPED") public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); try { - onBecomeOfflineFromOnline(message, context); - onBecomeDroppedFromOffline(message, context); - } catch (final Exception e) { - _logger.error("Caught exception on ONLINE -> DROPPED state transition", e); + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); Utils.rethrowException(e); } } @Transition(from = "ERROR", to = "OFFLINE") public void onBecomeOfflineFromError(Message message, NotificationContext context) { - _logger.info("Resetting the state for segment:{} from ERROR to OFFLINE", message.getPartitionName()); + _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message); + } + + @Transition(from = "ERROR", to = "DROPPED") + public void onBecomeDroppedFromError(Message message, NotificationContext context) { + _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org