This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2904f1bce1 Optimize DimensionTableDataManager to abort unnecesarry loading (#11192) 2904f1bce1 is described below commit 2904f1bce19f93b4e6f2f6492e741a5adc02dd82 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jul 27 13:51:05 2023 -0700 Optimize DimensionTableDataManager to abort unnecesarry loading (#11192) --- .../manager/offline/DimensionTableDataManager.java | 114 ++++++++++++++------- .../manager/offline/FastLookupDimensionTable.java | 9 +- .../offline/MemoryOptimizedDimensionTable.java | 18 ++-- .../segment/readers/PinotSegmentRecordReader.java | 4 + 4 files changed, 93 insertions(+), 52 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 7031c135dc..a9635f2a66 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -20,12 +20,15 @@ package org.apache.pinot.core.data.manager.offline; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -77,10 +80,12 @@ public class DimensionTableDataManager extends OfflineTableDataManager { return INSTANCES.get(tableNameWithType); } - private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER = - AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, DimensionTable.class, "_dimensionTable"); + private final AtomicReference<DimensionTable> _dimensionTable = new AtomicReference<>(); + + // Assign a token when loading the lookup table, cancel the loading when token changes because we will load it again + // anyway + private final AtomicInteger _loadToken = new AtomicInteger(); - private volatile DimensionTable _dimensionTable; private boolean _disablePreload = false; @Override @@ -102,10 +107,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } if (_disablePreload) { - _dimensionTable = new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(), - Collections.emptyList(), this); + _dimensionTable.set( + new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(), Collections.emptyList(), + Collections.emptyList(), this)); } else { - _dimensionTable = new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>()); + _dimensionTable.set(new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>())); } } @@ -114,8 +120,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager { super.addSegment(immutableSegment); String segmentName = immutableSegment.getSegmentName(); try { - loadLookupTable(); - _logger.info("Successfully loaded lookup table: {} after adding segment: {}", _tableNameWithType, segmentName); + if (loadLookupTable()) { + _logger.info("Successfully loaded lookup table after adding segment: {}", segmentName); + } else { + _logger.info("Skip loading lookup table after adding segment: {}, another loading in progress", segmentName); + } } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while loading lookup table: %s after adding segment: %s", _tableNameWithType, @@ -127,8 +136,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager { public void removeSegment(String segmentName) { super.removeSegment(segmentName); try { - loadLookupTable(); - _logger.info("Successfully loaded lookup table: {} after removing segment: {}", _tableNameWithType, segmentName); + if (loadLookupTable()) { + _logger.info("Successfully loaded lookup table after removing segment: {}", segmentName); + } else { + _logger.info("Skip loading lookup table after removing segment: {}, another loading in progress", segmentName); + } } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while loading lookup table: %s after removing segment: %s", @@ -138,39 +150,39 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override protected void doShutdown() { - closeDimensionTable(_dimensionTable); + closeDimensionTable(_dimensionTable.get()); } private void closeDimensionTable(DimensionTable dimensionTable) { try { dimensionTable.close(); } catch (Exception e) { - _logger.warn("Cannot close dimension table: {}", _tableNameWithType, e); + _logger.error("Caught exception while closing the dimension table", e); } } /** * `loadLookupTable()` reads contents of the DimensionTable into _lookupTable HashMap for fast lookup. */ - private void loadLookupTable() { - DimensionTable snapshot; - DimensionTable replacement; - do { - snapshot = _dimensionTable; - if (_disablePreload) { - replacement = createMemOptimisedDimensionTable(); - } else { - replacement = createFastLookupDimensionTable(); - } - } while (!UPDATER.compareAndSet(this, snapshot, replacement)); - - closeDimensionTable(snapshot); + private boolean loadLookupTable() { + DimensionTable dimensionTable = + _disablePreload ? createMemOptimisedDimensionTable() : createFastLookupDimensionTable(); + if (dimensionTable != null) { + closeDimensionTable(_dimensionTable.getAndSet(dimensionTable)); + return true; + } else { + return false; + } } + @Nullable private DimensionTable createFastLookupDimensionTable() { + // Acquire a token in the beginning. Abort the loading and return null when the token changes because another + // loading is in progress. + int token = _loadToken.incrementAndGet(); + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); - List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); @@ -185,6 +197,10 @@ public class DimensionTableDataManager extends OfflineTableDataManager { try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) { recordReader.init(indexSegment); for (int i = 0; i < numTotalDocs; i++) { + if (_loadToken.get() != token) { + // Token changed during the loading, abort the loading + return null; + } GenericRow row = new GenericRow(); recordReader.getRecord(i, row); lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row); @@ -203,16 +219,22 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } } + @Nullable private DimensionTable createMemOptimisedDimensionTable() { + // Acquire a token in the beginning. Abort the loading and return null when the token changes because another + // loading is in progress. + int token = _loadToken.incrementAndGet(); + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); - List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); + int numPrimaryKeyColumns = primaryKeyColumns.size(); Map<PrimaryKey, LookupRecordLocation> lookupTable = new HashMap<>(); List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); + List<PinotSegmentRecordReader> recordReaders = new ArrayList<>(segmentDataManagers.size()); for (SegmentDataManager segmentManager : segmentDataManagers) { IndexSegment indexSegment = segmentManager.getSegment(); int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs(); @@ -220,10 +242,28 @@ public class DimensionTableDataManager extends OfflineTableDataManager { try { PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(); recordReader.init(indexSegment); + recordReaders.add(recordReader); for (int i = 0; i < numTotalDocs; i++) { - GenericRow row = new GenericRow(); - recordReader.getRecord(i, row); - lookupTable.put(row.getPrimaryKey(primaryKeyColumns), new LookupRecordLocation(recordReader, i)); + if (_loadToken.get() != token) { + // Token changed during the loading, abort the loading + for (PinotSegmentRecordReader reader : recordReaders) { + try { + reader.close(); + } catch (Exception e) { + _logger.error("Caught exception while closing record reader for segment: {}", reader.getSegmentName(), + e); + } + } + for (SegmentDataManager dataManager : segmentDataManagers) { + releaseSegment(dataManager); + } + return null; + } + Object[] values = new Object[numPrimaryKeyColumns]; + for (int j = 0; j < numPrimaryKeyColumns; j++) { + values[j] = recordReader.getValue(i, primaryKeyColumns.get(j)); + } + lookupTable.put(new PrimaryKey(values), new LookupRecordLocation(recordReader, i)); } } catch (Exception e) { throw new RuntimeException( @@ -231,23 +271,23 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } } } - return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable, - segmentDataManagers, this); + return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable, segmentDataManagers, recordReaders, + this); } public boolean isPopulated() { - return !_dimensionTable.isEmpty(); + return !_dimensionTable.get().isEmpty(); } public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) { - return _dimensionTable.get(pk); + return _dimensionTable.get().get(pk); } public FieldSpec getColumnFieldSpec(String columnName) { - return _dimensionTable.getFieldSpecFor(columnName); + return _dimensionTable.get().getFieldSpecFor(columnName); } public List<String> getPrimaryKeyColumns() { - return _dimensionTable.getPrimaryKeyColumns(); + return _dimensionTable.get().getPrimaryKeyColumns(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java index ae6776aba3..81798e4cce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.data.manager.offline; -import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -27,9 +26,8 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; -class FastLookupDimensionTable implements DimensionTable { - - private Map<PrimaryKey, GenericRow> _lookupTable; +public class FastLookupDimensionTable implements DimensionTable { + private final Map<PrimaryKey, GenericRow> _lookupTable; private final Schema _tableSchema; private final List<String> _primaryKeyColumns; @@ -61,7 +59,6 @@ class FastLookupDimensionTable implements DimensionTable { } @Override - public void close() - throws IOException { + public void close() { } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java index ecaa4342b8..96fe847e54 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java @@ -18,11 +18,11 @@ */ package org.apache.pinot.core.data.manager.offline; -import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -31,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class MemoryOptimizedDimensionTable implements DimensionTable { +public class MemoryOptimizedDimensionTable implements DimensionTable { private static final Logger LOGGER = LoggerFactory.getLogger(MemoryOptimizedDimensionTable.class); private final Map<PrimaryKey, LookupRecordLocation> _lookupTable; @@ -39,15 +39,17 @@ class MemoryOptimizedDimensionTable implements DimensionTable { private final List<String> _primaryKeyColumns; private final ThreadLocal<GenericRow> _reuse = ThreadLocal.withInitial(GenericRow::new); private final List<SegmentDataManager> _segmentDataManagers; + private final List<PinotSegmentRecordReader> _recordReaders; private final TableDataManager _tableDataManager; MemoryOptimizedDimensionTable(Schema tableSchema, List<String> primaryKeyColumns, Map<PrimaryKey, LookupRecordLocation> lookupTable, List<SegmentDataManager> segmentDataManagers, - TableDataManager tableDataManager) { + List<PinotSegmentRecordReader> recordReaders, TableDataManager tableDataManager) { _tableSchema = tableSchema; _primaryKeyColumns = primaryKeyColumns; _lookupTable = lookupTable; _segmentDataManagers = segmentDataManagers; + _recordReaders = recordReaders; _tableDataManager = tableDataManager; } @@ -78,16 +80,14 @@ class MemoryOptimizedDimensionTable implements DimensionTable { } @Override - public void close() - throws IOException { - for (LookupRecordLocation lookupRecordLocation : _lookupTable.values()) { + public void close() { + for (PinotSegmentRecordReader recordReader : _recordReaders) { try { - lookupRecordLocation.getPinotSegmentRecordReader().close(); + recordReader.close(); } catch (Exception e) { - LOGGER.warn("Cannot close segment record reader", e); + LOGGER.error("Caught exception while closing record reader for segment: {}", recordReader.getSegmentName(), e); } } - for (SegmentDataManager segmentDataManager : _segmentDataManagers) { _tableDataManager.releaseSegment(segmentDataManager); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java index bdc83791f5..2c7a0f37d5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java @@ -215,6 +215,10 @@ public class PinotSegmentRecordReader implements RecordReader { return reuse; } + public String getSegmentName() { + return _indexSegment.getSegmentName(); + } + public void getRecord(int docId, GenericRow buffer) { for (Map.Entry<String, PinotSegmentColumnReader> entry : _columnReaderMap.entrySet()) { String column = entry.getKey(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org