This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ebd7509080 Enhance table deletion on server to prevent accessing old table data manager (#14334) ebd7509080 is described below commit ebd7509080bbdc81107fbaa76374ea82925315f3 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Oct 30 16:11:44 2024 -0700 Enhance table deletion on server to prevent accessing old table data manager (#14334) --- .../pinot/common/metadata/ZKMetadataProvider.java | 10 +++ .../core/data/manager/BaseTableDataManager.java | 8 +- .../core/data/manager/InstanceDataManager.java | 2 +- .../manager/offline/TableDataManagerProvider.java | 4 +- .../local/data/manager/TableDataManager.java | 4 +- .../starter/helix/HelixInstanceDataManager.java | 100 ++++++++++----------- .../helix/HelixInstanceDataManagerConfig.java | 25 ++---- .../helix/SegmentMessageHandlerFactory.java | 9 +- .../config/instance/InstanceDataManagerConfig.java | 6 +- 9 files changed, 79 insertions(+), 89 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 7d1143b0cb..0729533871 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -478,6 +478,16 @@ public class ZKMetadataProvider { AccessOption.PERSISTENT), replaceVariables); } + @Nullable + public static ImmutablePair<TableConfig, Stat> getTableConfigWithStat(ZkHelixPropertyStore<ZNRecord> propertyStore, + String tableNameWithType) { + Stat tableConfigStat = new Stat(); + TableConfig tableConfig = toTableConfig( + propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), tableConfigStat, + AccessOption.PERSISTENT)); + return tableConfig != null ? ImmutablePair.of(tableConfig, tableConfigStat) : null; + } + /** * @return a pair of table config and current version from znRecord, null if table config does not exist. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 9cd14272d8..10ff609b44 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.LoadingCache; import java.io.File; import java.io.IOException; import java.net.URI; @@ -111,9 +110,8 @@ public abstract class BaseTableDataManager implements TableDataManager { // Semaphore to restrict the maximum number of parallel segment downloads for a table private Semaphore _segmentDownloadSemaphore; - // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related - // errors as the value. - protected LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache; + // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related errors as the value. + protected Cache<Pair<String, String>, SegmentErrorInfo> _errorCache; // Cache used for identifying segments which could not be acquired since they were recently deleted. protected Cache<String, String> _recentlyDeletedSegments; @@ -122,7 +120,7 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, - @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) { + @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache) { LOGGER.info("Initializing table data manager for table: {}", tableConfig.getTableName()); _instanceDataManagerConfig = instanceDataManagerConfig; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 96d8534a65..73f497582e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -71,7 +71,7 @@ public interface InstanceDataManager { /** * Delete a table. */ - void deleteTable(String tableNameWithType) + void deleteTable(String tableNameWithType, long deletionTimeMs) throws Exception; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java index e2b9a190de..302df13509 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.data.manager.offline; -import com.google.common.cache.LoadingCache; +import com.google.common.cache.Cache; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -61,7 +61,7 @@ public class TableDataManagerProvider { } public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, - @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, + @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; switch (tableConfig.getTableType()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 480b2ba70b..092701bdef 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -19,7 +19,7 @@ package org.apache.pinot.segment.local.data.manager; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.LoadingCache; +import com.google.common.cache.Cache; import java.io.File; import java.util.List; import java.util.Map; @@ -53,7 +53,7 @@ public interface TableDataManager { */ void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, - @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache); + @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache); /** * Returns the instance id of the server. diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index ed5e087b41..4cf21a61fb 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -19,9 +19,8 @@ package org.apache.pinot.server.starter.helix; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.File; import java.io.IOException; @@ -42,10 +41,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.model.ExternalView; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -72,6 +68,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,12 +91,14 @@ public class HelixInstanceDataManager implements InstanceDataManager { private ZkHelixPropertyStore<ZNRecord> _propertyStore; private SegmentUploader _segmentUploader; private Supplier<Boolean> _isServerReadyToServeQueries = () -> false; - private long _externalViewDroppedMaxWaitMs; - private long _externalViewDroppedCheckInternalMs; // Fixed size LRU cache for storing last N errors on the instance. // Key is TableNameWithType-SegmentName pair. - private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache; + private Cache<Pair<String, String>, SegmentErrorInfo> _errorCache; + // Cache for recently deleted tables to prevent re-creating them accidentally. + // Key is table name with type, value is deletion time. + protected Cache<String, Long> _recentlyDeletedTables; + private ExecutorService _segmentRefreshExecutor; private ExecutorService _segmentPreloadExecutor; @@ -121,9 +120,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { _segmentUploader = new PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(), ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), serverMetrics); - _externalViewDroppedMaxWaitMs = _instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs(); - _externalViewDroppedCheckInternalMs = _instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs(); - File instanceDataDir = new File(_instanceDataManagerConfig.getInstanceDataDir()); initInstanceDataDir(instanceDataDir); @@ -152,15 +148,10 @@ public class HelixInstanceDataManager implements InstanceDataManager { } LOGGER.info("Initialized Helix instance data manager"); - // Initialize the error cache - _errorCache = CacheBuilder.newBuilder().maximumSize(_instanceDataManagerConfig.getErrorCacheSize()) - .build(new CacheLoader<>() { - @Override - public SegmentErrorInfo load(Pair<String, String> tableSegmentPair) { - // This cache is populated only via the put api. - return null; - } - }); + // Initialize the error cache and recently deleted tables cache + _errorCache = CacheBuilder.newBuilder().maximumSize(_instanceDataManagerConfig.getErrorCacheSize()).build(); + _recentlyDeletedTables = CacheBuilder.newBuilder() + .expireAfterWrite(_instanceDataManagerConfig.getDeletedTablesCacheTtlMinutes(), TimeUnit.MINUTES).build(); } private void initInstanceDataDir(File instanceDataDir) { @@ -228,9 +219,15 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override - public void deleteTable(String tableNameWithType) + public void deleteTable(String tableNameWithType, long deletionTimeMs) throws Exception { - TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + AtomicReference<TableDataManager> tableDataManagerRef = new AtomicReference<>(); + _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, v) -> { + _recentlyDeletedTables.put(k, deletionTimeMs); + tableDataManagerRef.set(v); + return null; + }); + TableDataManager tableDataManager = tableDataManagerRef.get(); if (tableDataManager == null) { LOGGER.warn("Failed to find table data manager for table: {}, skip deleting the table", tableNameWithType); return; @@ -238,35 +235,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { LOGGER.info("Shutting down table data manager for table: {}", tableNameWithType); tableDataManager.shutDown(); LOGGER.info("Finished shutting down table data manager for table: {}", tableNameWithType); - - try { - // Wait for external view to disappear or become empty before removing the table data manager. - // - // When creating the table, controller will check whether the external view exists, and allow table creation only - // if it doesn't exist. If the table is recreated just after external view disappeared, there is a small chance - // that server won't realize the external view is removed because it is recreated before the server checks it. In - // order to handle this scenario, we want to remove the table data manager when the external view exists but is - // empty. - HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor(); - PropertyKey externalViewKey = helixDataAccessor.keyBuilder().externalView(tableNameWithType); - long endTimeMs = System.currentTimeMillis() + _externalViewDroppedMaxWaitMs; - do { - ExternalView externalView = helixDataAccessor.getProperty(externalViewKey); - if (externalView == null) { - LOGGER.info("ExternalView is dropped for table: {}", tableNameWithType); - return; - } - if (externalView.getRecord().getMapFields().isEmpty()) { - LOGGER.info("ExternalView is empty for table: {}", tableNameWithType); - return; - } - Thread.sleep(_externalViewDroppedCheckInternalMs); - } while (System.currentTimeMillis() < endTimeMs); - LOGGER.warn("ExternalView still exists after {}ms for table: {}", _externalViewDroppedMaxWaitMs, - tableNameWithType); - } finally { - _tableDataManagerMap.remove(tableNameWithType); - } } @Override @@ -284,8 +252,26 @@ public class HelixInstanceDataManager implements InstanceDataManager { private TableDataManager createTableDataManager(String tableNameWithType) { LOGGER.info("Creating table data manager for table: {}", tableNameWithType); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + TableConfig tableConfig; + Long tableDeleteTimeMs = _recentlyDeletedTables.getIfPresent(tableNameWithType); + if (tableDeleteTimeMs != null) { + long currentTimeMs = System.currentTimeMillis(); + LOGGER.info("Table: {} was recently deleted (deleted {}ms ago), checking table config creation timestamp", + tableNameWithType, currentTimeMs - tableDeleteTimeMs); + Pair<TableConfig, Stat> tableConfigAndStat = + ZKMetadataProvider.getTableConfigWithStat(_propertyStore, tableNameWithType); + Preconditions.checkState(tableConfigAndStat != null, "Failed to find table config for table: %s", + tableNameWithType); + tableConfig = tableConfigAndStat.getLeft(); + long tableCreationTimeMs = tableConfigAndStat.getRight().getCtime(); + Preconditions.checkState(tableCreationTimeMs > tableDeleteTimeMs, + "Table: %s was recently deleted (deleted %dms ago) but the table config was created before that (created " + + "%dms ago)", tableNameWithType, currentTimeMs - tableDeleteTimeMs, currentTimeMs - tableCreationTimeMs); + _recentlyDeletedTables.invalidate(tableNameWithType); + } else { + tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + } TableDataManager tableDataManager = _tableDataManagerProvider.getTableDataManager(tableConfig, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); @@ -315,8 +301,12 @@ public class HelixInstanceDataManager implements InstanceDataManager { if (tableDataManager != null) { tableDataManager.offloadSegment(segmentName); } else { - LOGGER.warn("Failed to find data manager for table: {}, skipping offloading segment: {}", tableNameWithType, - segmentName); + if (_recentlyDeletedTables.getIfPresent(tableNameWithType) != null) { + LOGGER.info("Table: {} was recently deleted, skipping offloading segment: {}", tableNameWithType, segmentName); + } else { + LOGGER.warn("Failed to find data manager for table: {}, skipping offloading segment: {}", tableNameWithType, + segmentName); + } } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index ea90b9a6b7..a959e0b509 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -126,24 +126,19 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig // Size of cache that holds errors. private static final String ERROR_CACHE_SIZE = "error.cache.size"; + private static final String DELETED_TABLES_CACHE_TTL_MINUTES = "table.deleted.tables.cache.ttl.minutes"; private static final String DELETED_SEGMENTS_CACHE_SIZE = "table.deleted.segments.cache.size"; private static final String DELETED_SEGMENTS_CACHE_TTL_MINUTES = "table.deleted.segments.cache.ttl.minutes"; private static final String PEER_DOWNLOAD_SCHEME = "peer.download.scheme"; - // Check if the external view is dropped for a table, and if so, wait for the external view to - // be updated for a maximum of this time. - private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = "external.view.dropped.max.wait.ms"; - private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = "external.view.dropped.check.interval.ms"; - public static final String UPLOAD_SEGMENT_TO_DEEP_STORE = "segment.upload.to.deep.store"; public static final boolean DEFAULT_UPLOAD_SEGMENT_TO_DEEP_STORE = false; private final static String[] REQUIRED_KEYS = {INSTANCE_ID}; private static final long DEFAULT_ERROR_CACHE_SIZE = 100L; + private static final int DEFAULT_DELETED_TABLES_CACHE_TTL_MINUTES = 60; private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000; private static final int DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES = 2; - public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 * 60_000L; - public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = 1_000L; private final PinotConfiguration _serverConfig; private final PinotConfiguration _upsertConfig; @@ -293,6 +288,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT); } + @Override + public int getDeletedTablesCacheTtlMinutes() { + return _serverConfig.getProperty(DELETED_TABLES_CACHE_TTL_MINUTES, DEFAULT_DELETED_TABLES_CACHE_TTL_MINUTES); + } + @Override public int getDeletedSegmentsCacheSize() { return _serverConfig.getProperty(DELETED_SEGMENTS_CACHE_SIZE, DEFAULT_DELETED_SEGMENTS_CACHE_SIZE); @@ -308,17 +308,6 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig return _serverConfig.getProperty(PEER_DOWNLOAD_SCHEME); } - @Override - public long getExternalViewDroppedMaxWaitMs() { - return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS, DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS); - } - - @Override - public long getExternalViewDroppedCheckIntervalMs() { - return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS, - DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS); - } - @Override public PinotConfiguration getUpsertConfig() { return _upsertConfig; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index 0b7111a75e..d08aa790c0 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -178,9 +178,14 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { public HelixTaskResult handleMessage() throws InterruptedException { HelixTaskResult helixTaskResult = new HelixTaskResult(); - _logger.info("Handling table deletion message"); + _logger.info("Handling table deletion message: {}", _message); try { - _instanceDataManager.deleteTable(_tableNameWithType); + long deletionTimeMs = _message.getCreateTimeStamp(); + if (deletionTimeMs <= 0) { + _logger.warn("Invalid deletion time: {}, using current time as deletion time", deletionTimeMs); + deletionTimeMs = System.currentTimeMillis(); + } + _instanceDataManager.deleteTable(_tableNameWithType, deletionTimeMs); helixTaskResult.setSuccess(true); } catch (Exception e) { _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index 152e47ac95..c9d406d19e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -63,16 +63,14 @@ public interface InstanceDataManagerConfig { long getStreamSegmentDownloadUntarRateLimit(); + int getDeletedTablesCacheTtlMinutes(); + int getDeletedSegmentsCacheSize(); int getDeletedSegmentsCacheTtlMinutes(); String getSegmentPeerDownloadScheme(); - long getExternalViewDroppedMaxWaitMs(); - - long getExternalViewDroppedCheckIntervalMs(); - PinotConfiguration getUpsertConfig(); PinotConfiguration getAuthConfig(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org