This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d4f2ece Set processingException when all queried segments cannot be acquired (#3942) d4f2ece is described below commit d4f2ecef660ab1d4efa9696a53b0623aac867c3f Author: Sunitha Beeram <sbee...@linkedin.com> AuthorDate: Fri Mar 22 09:33:09 2019 -0700 Set processingException when all queried segments cannot be acquired (#3942) * Set processingException when all queried segments cannot be acquired * Minor fixes * Address review comments * Disable setting processing exception until metrics are validated * Address review comments * Address review comments * Address review comments * Address review comments * Address review comments: Make InstanceDataManager symmetric with respect to tracking segments added and deleted --- .../pinot/common/exception/QueryException.java | 2 + .../core/data/manager/BaseTableDataManager.java | 48 +++++++++++++++++---- .../core/data/manager/InstanceDataManager.java | 14 ++++++ .../pinot/core/data/manager/TableDataManager.java | 15 +++++++ .../query/executor/ServerQueryExecutorV1Impl.java | 33 +++++++++++++- .../data/manager/BaseTableDataManagerTest.java | 5 +++ .../pinot/query/executor/QueryExecutorTest.java | 50 ++++++++++++++++++++-- .../starter/helix/HelixInstanceDataManager.java | 16 +++++++ .../SegmentOnlineOfflineStateModelFactory.java | 5 +++ 9 files changed, 175 insertions(+), 13 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index e0ed903..6c45897 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -41,6 +41,7 @@ public class QueryException { public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160; public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170; public static final int ACCESS_DENIED_ERROR_CODE = 180; + public static final int SEGMENTS_MISSING_ERROR_CODE = 190; public static final int QUERY_EXECUTION_ERROR_CODE = 200; // TODO: Handle these errors in broker public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210; @@ -96,6 +97,7 @@ public class QueryException { public static final ProcessingException QUERY_VALIDATION_ERROR = new ProcessingException(QUERY_VALIDATION_ERROR_CODE); public static final ProcessingException UNKNOWN_ERROR = new ProcessingException(UNKNOWN_ERROR_CODE); public static final ProcessingException QUOTA_EXCEEDED_ERROR = new ProcessingException(TOO_MANY_REQUESTS_ERROR_CODE); + public static final ProcessingException SEGMENTS_MISSING_ERROR = new ProcessingException(SEGMENTS_MISSING_ERROR_CODE); static { JSON_PARSING_ERROR.setMessage("JsonParsingError"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index a456691..3f074a1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -19,10 +19,13 @@ package org.apache.pinot.core.data.manager; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.apache.helix.ZNRecord; @@ -42,9 +45,13 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BaseTableDataManager implements TableDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); + // cache deleted segment names for utmost this duration + private static final int MAX_CACHE_DURATION_SEC = 6 * 3600; // 6 hours protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>(); + protected Cache<String, Boolean> _deletedSegmentsCache; + protected TableDataManagerConfig _tableDataManagerConfig; protected String _instanceId; protected ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -59,6 +66,7 @@ public abstract class BaseTableDataManager implements TableDataManager { @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull ServerMetrics serverMetrics) { LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName()); + _deletedSegmentsCache = CacheBuilder.newBuilder().expireAfterWrite(MAX_CACHE_DURATION_SEC, TimeUnit.SECONDS).build(); _tableDataManagerConfig = tableDataManagerConfig; _instanceId = instanceId; _propertyStore = propertyStore; @@ -117,6 +125,8 @@ public abstract class BaseTableDataManager implements TableDataManager { ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager); + + // release old segment if needed if (oldSegmentManager == null) { _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType); } else { @@ -156,6 +166,35 @@ public abstract class BaseTableDataManager implements TableDataManager { } } + /** + * Called when a segment is deleted. The actual handling of segment delete is outside of this method. + * This method provides book-keeping around deleted segments. + * @param segmentName name of the segment to track. + */ + public void notifySegmentDeleted(@Nonnull String segmentName) { + // add segment to the cache + _deletedSegmentsCache.put(segmentName, true); + } + + /** + * Check if a segment is recently deleted. + * + * @param segmentName name of the segment to check. + * @return true if segment is in the cache, false otherwise + */ + public boolean isRecentlyDeleted(@Nonnull String segmentName) { + return _deletedSegmentsCache.getIfPresent(segmentName) != null; + } + + /** + * Remove a segment from the deleted cache if it is being added back. + * + * @param segmentName name of the segment that needs to removed from the cache (if needed) + */ + public void notifySegmentAdded(@Nonnull String segmentName) { + _deletedSegmentsCache.invalidate(segmentName); + } + @Nonnull @Override public List<SegmentDataManager> acquireAllSegments() { @@ -176,8 +215,6 @@ public abstract class BaseTableDataManager implements TableDataManager { SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) { segmentDataManagers.add(segmentDataManager); - } else { - handleMissingSegment(segmentName); } } return segmentDataManagers; @@ -189,17 +226,10 @@ public abstract class BaseTableDataManager implements TableDataManager { if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) { return segmentDataManager; } else { - handleMissingSegment(segmentName); return null; } } - private void handleMissingSegment(String segmentName) { - // could not find segment - LOGGER.error("Could not find segment " + segmentName + " for table " + _tableNameWithType); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, 1); - } - @Override public void releaseSegment(@Nonnull SegmentDataManager segmentDataManager) { if (segmentDataManager.decreaseReferenceCount()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 85090dc..9399c7a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -79,6 +79,20 @@ public interface InstanceDataManager { throws Exception; /** + * Handles addition of a segment from the table. + * + * This method performs book keeping of added segments, especially if the deleted-cache needs to be invalidated + */ + void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName); + + /** + * Handles deletion of a segment from the table. + * + * This method performs book keeping of deleted segments. + */ + void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName); + + /** * Reloads a segment in a table. */ void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java index c237fbf..16eae6b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java @@ -80,6 +80,21 @@ public interface TableDataManager { void removeSegment(@Nonnull String segmentName); /** + * Track a deleted segment. + */ + void notifySegmentDeleted(@Nonnull String segmentName); + + /** + * Track addition of a segment + */ + void notifySegmentAdded(@Nonnull String segmentName); + + /** + * Check if a segment is recently deleted. + */ + boolean isRecentlyDeleted(@Nonnull String segmentName); + + /** * Acquires all segments of the table. * <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}. * diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 33e0db5..4ab000c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.executor; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -124,7 +125,24 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType); - List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(queryRequest.getSegmentsToQuery()); + + // acquire the segments + int missingSegments = 0; + List<String> segmentsToQuery = queryRequest.getSegmentsToQuery(); + List<SegmentDataManager> segmentDataManagers = new ArrayList<>(); + for (String segmentName : segmentsToQuery) { + SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); + if (segmentDataManager != null) { + segmentDataManagers.add(segmentDataManager); + } else { + if (!tableDataManager.isRecentlyDeleted(segmentName)) { + LOGGER.error("Could not find segment {} for table {} for requestId {}", segmentName, tableNameWithType, + requestId); + missingSegments++; + } + } + } + int numSegmentsQueried = segmentDataManagers.size(); boolean enableTrace = queryRequest.isEnableTrace(); if (enableTrace) { @@ -195,6 +213,19 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { long queryProcessingTime = queryProcessingTimer.getDurationMs(); dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried)); dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime)); + + if (missingSegments > 0) { + // TODO: add this exception to the datatable after verfying the metrics + // Currently, given the deleted segments cache is in-memory only, a server restart will reset it + // We might end up sending partial-response metadata in such cases. It appears that the likelihood of + // this occurence is low; ie, segment has to be retained out and the server must be restarted while the + // broker view is still behind. We would however like to validate that and/or conf control this based on + // data. + /*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR, + "Could not find " + missingSegments + " segments on the server"));*/ + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, missingSegments); + } + LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime); LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable); return dataTable; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index 9b28f5a..a19b288 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -158,6 +158,11 @@ public class BaseTableDataManagerTest { // Removing the segment again is fine. tableDataManager.removeSegment(segmentName); + // Delete the segment + tableDataManager.notifySegmentDeleted(segmentName); + // check that it is recorded as deleted + Assert.assertTrue(tableDataManager.isRecentlyDeleted(segmentName)); + // Add a new segment and remove it in order this time. final String anotherSeg = "AnotherSegment"; ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs); diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java index b6ce1e2..a4b3241 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java @@ -68,6 +68,7 @@ public class QueryExecutorTest { private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE); private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE); + private InstanceDataManager _instanceDataManager; private ServerMetrics _serverMetrics; private QueryExecutor _queryExecutor; @@ -105,8 +106,8 @@ public class QueryExecutorTest { for (ImmutableSegment indexSegment : _indexSegments) { tableDataManager.addSegment(indexSegment); } - InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); - when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager); + _instanceDataManager = mock(InstanceDataManager.class); + when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager); // Set up the query executor resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH); @@ -115,7 +116,7 @@ public class QueryExecutorTest { queryExecutorConfig.setDelimiterParsingDisabled(false); queryExecutorConfig.load(new File(resourceUrl.getFile())); _queryExecutor = new ServerQueryExecutorV1Impl(); - _queryExecutor.init(queryExecutorConfig, instanceDataManager, _serverMetrics); + _queryExecutor.init(queryExecutorConfig, _instanceDataManager, _serverMetrics); } @Test @@ -154,6 +155,49 @@ public class QueryExecutorTest { Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0); } + @Test + public void testDeletedSegmentQuery() { + String query = "SELECT count(*) FROM " + TABLE_NAME; + _instanceDataManager.notifySegmentDeleted(TABLE_NAME, _segmentNames.get(0)); + + InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query)); + instanceRequest.setSearchSegments(_segmentNames); + DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); + Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L); + + for (String key : instanceResponse.getMetadata().keySet()) { + if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) { + Assert.fail("Response should not contain exceptions"); + } + } + } + + // TODO: enable this when the code is updated to set the exception + @Test(enabled=false) + public void testMissingSegmentQuery() { + String query = "SELECT count(*) FROM " + TABLE_NAME; + + List<String> searchSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE + 1); + searchSegments.addAll(_segmentNames); + searchSegments.add("NON_EXISTENT_SEGMENT"); + + InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query)); + instanceRequest.setSearchSegments(searchSegments); + DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); + Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L); + + boolean exception = false; + for (String key : instanceResponse.getMetadata().keySet()) { + if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) { + // "null" below stems from a quirk around how the processing exception is built + Assert.assertEquals("null:\nCould not find 1 segments on the server", instanceResponse.getMetadata().get(key)); + exception = true; + } + } + Assert.assertTrue(exception, "Expected missing segment exception"); + } + + @AfterClass public void tearDown() { for (IndexSegment segment : _indexSegments) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 2e33aa3..d12645d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -152,6 +152,22 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override + public void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName) { + TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + if (tableDataManager != null) { + tableDataManager.notifySegmentAdded(segmentName); + } + } + + @Override + public void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName) { + TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); + if (tableDataManager != null) { + tableDataManager.notifySegmentDeleted(segmentName); + } + } + + @Override public void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName) throws Exception { LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index eea3b57..d915373 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -166,6 +166,8 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta } else { _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName); } + // handle any book-keeping after a segment is added + _instanceDataManager.notifySegmentAdded(tableNameWithType, segmentName); } catch (Exception e) { _logger.error("Caught exception in state transition from OFFLINE -> ONLINE for resource: {}, partition: {}", tableNameWithType, segmentName, e); @@ -196,6 +198,9 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String tableNameWithType = message.getResourceName(); String segmentName = message.getPartitionName(); + // handle any additional book-keeping that needs to be done when a segment is dropped + _instanceDataManager.notifySegmentDeleted(tableNameWithType, segmentName); + // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org