This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f3ec75972a Optimize initialization time and memory usage of dimension table (#15130) f3ec75972a is described below commit f3ec75972aae72aeffef833d4a8d5bc62fa461e7 Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com> AuthorDate: Tue Mar 25 11:52:24 2025 +0100 Optimize initialization time and memory usage of dimension table (#15130) * Optimize creation and memory usage of eagerly-loaded dimension table. * Add missing header. * Trigger CI. * Optimize memory usage and initialization time of memory-optimized dimension table. * Fix bad conditional. * Fixed dimension table shutdown. --- .../controller/helix/ControllerRequestClient.java | 8 +- .../pinot/controller/helix/ControllerTest.java | 5 + .../core/data/manager/offline/DimensionTable.java | 7 + .../manager/offline/DimensionTableDataManager.java | 126 +++++++-- .../manager/offline/FastLookupDimensionTable.java | 83 +++++- .../offline/MemoryOptimizedDimensionTable.java | 51 +++- .../offline/DimensionTableDataManagerTest.java | 47 +++- .../tests/DimensionTableIntegrationTest.java | 169 ++++++++++++ .../perf/BenchmarkDimensionTableOverhead.java | 291 +++++++++++++++++++++ .../query/runtime/operator/LookupJoinOperator.java | 17 +- .../segment/readers/PinotSegmentRecordReader.java | 45 +++- .../utils/builder/ControllerRequestURLBuilder.java | 10 +- 12 files changed, 790 insertions(+), 69 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 311a1caada..754367fef5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -139,9 +139,15 @@ public class ControllerRequestClient { public void deleteTable(String tableNameWithType) throws IOException { + deleteTable(tableNameWithType, null); + } + + public void deleteTable(String tableNameWithType, String retentionPeriod) + throws IOException { try { HttpClient.wrapAndThrowHttpException( - _httpClient.sendDeleteRequest(new URI(_controllerRequestURLBuilder.forTableDelete(tableNameWithType)), + _httpClient.sendDeleteRequest( + new URI(_controllerRequestURLBuilder.forTableDelete(tableNameWithType, retentionPeriod)), _headers)); } catch (HttpErrorStatusException | URISyntaxException e) { throw new IOException(e); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 5b213da026..7a00a4d8e4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -702,6 +702,11 @@ public class ControllerTest { getControllerRequestClient().deleteTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName)); } + public void dropOfflineTable(String tableName, String retentionPeriod) + throws IOException { + getControllerRequestClient().deleteTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName), retentionPeriod); + } + public void dropRealtimeTable(String tableName) throws IOException { getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java index 92637a45fd..07c2f7ade8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java @@ -37,6 +37,13 @@ public interface DimensionTable extends Closeable { boolean containsKey(PrimaryKey pk); + /** + * Deprecated because GenericRow is an inefficient data structure. + * Use getValue() or getValues() instead. + * @param pk primary key + * @return primary key and value as GenericRow. + */ + @Deprecated @Nullable GenericRow getRow(PrimaryKey pk); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 6ed4edfd48..5e36d847e4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -20,11 +20,15 @@ package org.apache.pinot.core.data.manager.offline; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.Hash; +import it.unimi.dsi.fastutil.objects.Object2LongOpenCustomHashMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -59,6 +63,17 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // Storing singletons per table in a map private static final Map<String, DimensionTableDataManager> INSTANCES = new ConcurrentHashMap<>(); + public static final Hash.Strategy<Object[]> HASH_STRATEGY = new Hash.Strategy<>() { + @Override + public int hashCode(Object[] o) { + return Arrays.hashCode(o); + } + + @Override + public boolean equals(Object[] a, Object[] b) { + return Arrays.equals(a, b); + } + }; private DimensionTableDataManager() { } @@ -110,11 +125,19 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } if (_disablePreload) { + Object2LongOpenCustomHashMap<Object[]> lookupTable = new Object2LongOpenCustomHashMap<>(HASH_STRATEGY); + lookupTable.defaultReturnValue(Long.MIN_VALUE); + _dimensionTable.set( - new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(), Collections.emptyList(), + new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable, Collections.emptyList(), Collections.emptyList(), this)); } else { - _dimensionTable.set(new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>())); + List<String> valueColumns = getValueColumns(schema.getColumnNames(), primaryKeyColumns); + + Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable = + new Object2ObjectOpenCustomHashMap<>(HASH_STRATEGY); + + _dimensionTable.set(new FastLookupDimensionTable(schema, primaryKeyColumns, valueColumns, lookupTable)); } } @@ -151,8 +174,10 @@ public class DimensionTableDataManager extends OfflineTableDataManager { protected void doShutdown() { releaseAndRemoveAllSegments(); closeDimensionTable(_dimensionTable.get()); + INSTANCES.remove(_tableNameWithType); } + private void closeDimensionTable(DimensionTable dimensionTable) { try { dimensionTable.close(); @@ -187,36 +212,53 @@ public class DimensionTableDataManager extends OfflineTableDataManager { Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); - Map<PrimaryKey, GenericRow> lookupTable = new HashMap<>(); List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); try { + // count all documents to limit map re-sizings + int totalDocs = 0; + for (SegmentDataManager segmentManager : segmentDataManagers) { + IndexSegment indexSegment = segmentManager.getSegment(); + totalDocs += indexSegment.getSegmentMetadata().getTotalDocs(); + } + + Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable = + new Object2ObjectOpenCustomHashMap<>(totalDocs, HASH_STRATEGY); + + List<String> valueColumns = getValueColumns(schema.getColumnNames(), primaryKeyColumns); + for (SegmentDataManager segmentManager : segmentDataManagers) { IndexSegment indexSegment = segmentManager.getSegment(); int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs(); if (numTotalDocs > 0) { try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) { recordReader.init(indexSegment); + + int[] pkIndexes = recordReader.getIndexesForColumns(primaryKeyColumns); + int[] valIndexes = recordReader.getIndexesForColumns(valueColumns); + for (int i = 0; i < numTotalDocs; i++) { if (_loadToken.get() != token) { // Token changed during the loading, abort the loading return null; } - GenericRow row = new GenericRow(); - recordReader.getRecord(i, row); - GenericRow previousRow = lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row); - if (_errorOnDuplicatePrimaryKey && previousRow != null) { + + Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes); + Object[] values = recordReader.getRecordValues(i, valIndexes); + + Object[] previousValue = lookupTable.put(primaryKey, values); + if (_errorOnDuplicatePrimaryKey && previousValue != null) { throw new IllegalStateException( "Caught exception while reading records from segment: " + indexSegment.getSegmentName() - + "primary key already exist for: " + row.getPrimaryKey(primaryKeyColumns)); + + "primary key already exist for: " + Arrays.toString(primaryKey)); } } } catch (Exception e) { throw new RuntimeException( - "Caught exception while reading records from segment: " + indexSegment.getSegmentName()); + "Caught exception while reading records from segment: " + indexSegment.getSegmentName(), e); } } } - return new FastLookupDimensionTable(schema, primaryKeyColumns, lookupTable); + return new FastLookupDimensionTable(schema, primaryKeyColumns, valueColumns, lookupTable); } finally { for (SegmentDataManager segmentManager : segmentDataManagers) { releaseSegment(segmentManager); @@ -224,6 +266,16 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } } + private static List<String> getValueColumns(NavigableSet<String> columnNames, List<String> primaryKeyColumns) { + List<String> nonPkColumns = new ArrayList<>(columnNames.size() - primaryKeyColumns.size()); + for (String columnName : columnNames) { + if (!primaryKeyColumns.contains(columnName)) { + nonPkColumns.add(columnName); + } + } + return nonPkColumns; + } + @Nullable private DimensionTable createMemOptimisedDimensionTable() { // Acquire a token in the beginning. Abort the loading and return null when the token changes because another @@ -235,40 +287,41 @@ public class DimensionTableDataManager extends OfflineTableDataManager { List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); - int numPrimaryKeyColumns = primaryKeyColumns.size(); - Map<PrimaryKey, LookupRecordLocation> lookupTable = new HashMap<>(); List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); List<PinotSegmentRecordReader> recordReaders = new ArrayList<>(segmentDataManagers.size()); + + int totalDocs = 0; + for (SegmentDataManager segmentManager : segmentDataManagers) { + IndexSegment indexSegment = segmentManager.getSegment(); + totalDocs += indexSegment.getSegmentMetadata().getTotalDocs(); + } + + Object2LongOpenCustomHashMap<Object[]> lookupTable = new Object2LongOpenCustomHashMap<>(totalDocs, HASH_STRATEGY); + lookupTable.defaultReturnValue(Long.MIN_VALUE); + for (SegmentDataManager segmentManager : segmentDataManagers) { IndexSegment indexSegment = segmentManager.getSegment(); int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs(); if (numTotalDocs > 0) { try { + int readerIdx = recordReaders.size(); PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(); recordReader.init(indexSegment); recordReaders.add(recordReader); + int[] pkIndexes = recordReader.getIndexesForColumns(primaryKeyColumns); + for (int i = 0; i < numTotalDocs; i++) { if (_loadToken.get() != token) { // Token changed during the loading, abort the loading - for (PinotSegmentRecordReader reader : recordReaders) { - try { - reader.close(); - } catch (Exception e) { - _logger.error("Caught exception while closing record reader for segment: {}", reader.getSegmentName(), - e); - } - } - for (SegmentDataManager dataManager : segmentDataManagers) { - releaseSegment(dataManager); - } + releaseResources(recordReaders, segmentDataManagers); return null; } - Object[] values = new Object[numPrimaryKeyColumns]; - for (int j = 0; j < numPrimaryKeyColumns; j++) { - values[j] = recordReader.getValue(i, primaryKeyColumns.get(j)); - } - lookupTable.put(new PrimaryKey(values), new LookupRecordLocation(recordReader, i)); + + Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes); + + long readerIdxAndDocId = (((long) readerIdx) << 32) | (i & 0xffffffffL); + lookupTable.put(primaryKey, readerIdxAndDocId); } } catch (Exception e) { throw new RuntimeException( @@ -280,6 +333,21 @@ public class DimensionTableDataManager extends OfflineTableDataManager { this); } + private void releaseResources(List<PinotSegmentRecordReader> recordReaders, + List<SegmentDataManager> segmentDataManagers) { + for (PinotSegmentRecordReader reader : recordReaders) { + try { + reader.close(); + } catch (Exception e) { + _logger.error("Caught exception while closing record reader for segment: {}", reader.getSegmentName(), + e); + } + } + for (SegmentDataManager dataManager : segmentDataManagers) { + releaseSegment(dataManager); + } + } + public boolean isPopulated() { return !_dimensionTable.get().isEmpty(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java index ccbce7439c..a764d73c3b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java @@ -18,8 +18,9 @@ */ package org.apache.pinot.core.data.manager.offline; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap; import java.util.List; -import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; @@ -28,15 +29,35 @@ import org.apache.pinot.spi.data.readers.PrimaryKey; public class FastLookupDimensionTable implements DimensionTable { - private final Map<PrimaryKey, GenericRow> _lookupTable; + + private final Object2ObjectOpenCustomHashMap<Object[], Object[]> _lookupTable; private final Schema _tableSchema; private final List<String> _primaryKeyColumns; + private final List<String> _valueColumns; + private final int _keysNum; + + private final Object2IntOpenHashMap<String> _columnNamesToIdx; - FastLookupDimensionTable(Schema tableSchema, List<String> primaryKeyColumns, - Map<PrimaryKey, GenericRow> lookupTable) { + FastLookupDimensionTable(Schema tableSchema, + List<String> primaryKeyColumns, + List<String> valueColumns, + Object2ObjectOpenCustomHashMap<Object[], Object[]> lookupTable) { _lookupTable = lookupTable; _tableSchema = tableSchema; _primaryKeyColumns = primaryKeyColumns; + _keysNum = _primaryKeyColumns.size(); + _valueColumns = valueColumns; + + _columnNamesToIdx = new Object2IntOpenHashMap<>(_primaryKeyColumns.size() + valueColumns.size()); + _columnNamesToIdx.defaultReturnValue(Integer.MIN_VALUE); + + int idx = 0; + for (String column : primaryKeyColumns) { + _columnNamesToIdx.put(column, idx++); + } + for (String column : valueColumns) { + _columnNamesToIdx.put(column, idx++); + } } @Override @@ -57,35 +78,71 @@ public class FastLookupDimensionTable implements DimensionTable { @Override public boolean containsKey(PrimaryKey pk) { - return _lookupTable.containsKey(pk); + return _lookupTable.containsKey(pk.getValues()); } + /** + * This method returns GenericRow, which has big memory and cpu overhead. + */ + @Deprecated @Nullable @Override public GenericRow getRow(PrimaryKey pk) { - return _lookupTable.get(pk); + Object[] rawPk = pk.getValues(); + Object[] value = _lookupTable.get(rawPk); + if (value == null) { + return null; + } + + GenericRow row = new GenericRow(); + int pIdx = 0; + for (String column : _primaryKeyColumns) { + row.putValue(column, rawPk[pIdx++]); + } + + int vIdx = 0; + for (String column : _valueColumns) { + row.putValue(column, value[vIdx++]); + } + + return row; } @Nullable @Override public Object getValue(PrimaryKey pk, String columnName) { - GenericRow row = _lookupTable.get(pk); - return row != null ? row.getValue(columnName) : null; + Object[] value = _lookupTable.get(pk.getValues()); + if (value == null) { + return null; + } + + return getValue(pk, columnName, value); + } + + private Object getValue(PrimaryKey pk, String columnName, Object[] values) { + int idx = _columnNamesToIdx.getInt(columnName); + if (idx < 0) { + return null; + } else if (idx < _keysNum) { + return pk.getValues()[idx]; + } else { + return values[idx - _keysNum]; + } } @Nullable @Override public Object[] getValues(PrimaryKey pk, String[] columnNames) { - GenericRow row = _lookupTable.get(pk); - if (row == null) { + Object[] rowValues = _lookupTable.get(pk.getValues()); + if (rowValues == null) { return null; } int numColumns = columnNames.length; - Object[] values = new Object[numColumns]; + Object[] result = new Object[numColumns]; for (int i = 0; i < numColumns; i++) { - values[i] = row.getValue(columnNames[i]); + result[i] = getValue(pk, columnNames[i], rowValues); } - return values; + return result; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java index 1303bf3bd8..b04d9a7805 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java @@ -18,8 +18,8 @@ */ package org.apache.pinot.core.data.manager.offline; +import it.unimi.dsi.fastutil.objects.Object2LongOpenCustomHashMap; import java.util.List; -import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; public class MemoryOptimizedDimensionTable implements DimensionTable { private static final Logger LOGGER = LoggerFactory.getLogger(MemoryOptimizedDimensionTable.class); - private final Map<PrimaryKey, LookupRecordLocation> _lookupTable; + private final Object2LongOpenCustomHashMap<Object[]> _lookupTable; private final Schema _tableSchema; private final List<String> _primaryKeyColumns; private final ThreadLocal<GenericRow> _reuse = ThreadLocal.withInitial(GenericRow::new); @@ -43,9 +43,12 @@ public class MemoryOptimizedDimensionTable implements DimensionTable { private final List<PinotSegmentRecordReader> _recordReaders; private final TableDataManager _tableDataManager; - MemoryOptimizedDimensionTable(Schema tableSchema, List<String> primaryKeyColumns, - Map<PrimaryKey, LookupRecordLocation> lookupTable, List<SegmentDataManager> segmentDataManagers, - List<PinotSegmentRecordReader> recordReaders, TableDataManager tableDataManager) { + MemoryOptimizedDimensionTable(Schema tableSchema, + List<String> primaryKeyColumns, + Object2LongOpenCustomHashMap<Object[]> lookupTable, + List<SegmentDataManager> segmentDataManagers, + List<PinotSegmentRecordReader> recordReaders, + TableDataManager tableDataManager) { _tableSchema = tableSchema; _primaryKeyColumns = primaryKeyColumns; _lookupTable = lookupTable; @@ -71,39 +74,59 @@ public class MemoryOptimizedDimensionTable implements DimensionTable { @Override public boolean containsKey(PrimaryKey pk) { - return _lookupTable.containsKey(pk); + return _lookupTable.containsKey(pk.getValues()); } @Nullable @Override public GenericRow getRow(PrimaryKey pk) { - LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk); - if (lookupRecordLocation == null) { + long readerIdxAndDocId = _lookupTable.getLong(pk.getValues()); + if (readerIdxAndDocId < 0) { return null; } + GenericRow reuse = _reuse.get(); reuse.clear(); - return lookupRecordLocation.getRecord(reuse); + + int docId = (int) (readerIdxAndDocId & 0xffffffffL); + int readerIdx = (int) (readerIdxAndDocId >> 32); + + PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx); + recordReader.getRecord(docId, reuse); + return reuse; } @Nullable @Override public Object getValue(PrimaryKey pk, String columnName) { - LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk); - return lookupRecordLocation != null ? lookupRecordLocation.getValue(columnName) : null; + long readerIdxAndDocId = _lookupTable.getLong(pk.getValues()); + if (readerIdxAndDocId < 0) { + return null; + } + + int docId = (int) (readerIdxAndDocId & 0xffffffffL); + int readerIdx = (int) (readerIdxAndDocId >> 32); + + PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx); + return recordReader.getValue(docId, columnName); } @Nullable @Override public Object[] getValues(PrimaryKey pk, String[] columnNames) { - LookupRecordLocation lookupRecordLocation = _lookupTable.get(pk); - if (lookupRecordLocation == null) { + long readerIdxAndDocId = _lookupTable.getLong(pk.getValues()); + if (readerIdxAndDocId < 0) { return null; } + + int docId = (int) (readerIdxAndDocId & 0xffffffffL); + int readerIdx = (int) (readerIdxAndDocId >> 32); + PinotSegmentRecordReader recordReader = _recordReaders.get(readerIdx); + int numColumns = columnNames.length; Object[] values = new Object[numColumns]; for (int i = 0; i < numColumns; i++) { - values[i] = lookupRecordLocation.getValue(columnNames[i]); + values[i] = recordReader.getValue(docId, columnNames[i]); } return values; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 5c8edb06b6..469fd658fc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -65,6 +65,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; @@ -129,21 +130,28 @@ public class DimensionTableDataManagerTest { } private Schema getSchema() { - return new Schema.SchemaBuilder().setSchemaName("dimBaseballTeams") - .addSingleValueDimension("teamID", DataType.STRING).addSingleValueDimension("teamName", DataType.STRING) - .setPrimaryKeyColumns(Collections.singletonList("teamID")).build(); + return new Schema.SchemaBuilder() + .setSchemaName("dimBaseballTeams") + .addSingleValueDimension("teamID", DataType.STRING) + .addSingleValueDimension("teamName", DataType.STRING) + .setPrimaryKeyColumns(Collections.singletonList("teamID")) + .build(); } private TableConfig getTableConfig(boolean disablePreload, boolean errorOnDuplicatePrimaryKey) { DimensionTableConfig dimensionTableConfig = new DimensionTableConfig(disablePreload, errorOnDuplicatePrimaryKey); - return new TableConfigBuilder(TableType.OFFLINE).setTableName("dimBaseballTeams") - .setDimensionTableConfig(dimensionTableConfig).build(); + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName("dimBaseballTeams") + .setDimensionTableConfig(dimensionTableConfig) + .build(); } private Schema getSchemaWithExtraColumn() { return new Schema.SchemaBuilder().setSchemaName("dimBaseballTeams") - .addSingleValueDimension("teamID", DataType.STRING).addSingleValueDimension("teamName", DataType.STRING) - .addSingleValueDimension("teamCity", DataType.STRING).setPrimaryKeyColumns(Collections.singletonList("teamID")) + .addSingleValueDimension("teamID", DataType.STRING) + .addSingleValueDimension("teamName", DataType.STRING) + .addSingleValueDimension("teamCity", DataType.STRING) + .setPrimaryKeyColumns(Collections.singletonList("teamID")) .build(); } @@ -350,6 +358,31 @@ public class DimensionTableDataManagerTest { assertNull(tableDataManager.lookupRow(key)); } + @DataProvider(name = "options") + private Object[] getOptions() { + return new Boolean[]{true, false}; + } + + @Test(dataProvider = "options") + public void testDeleteTableRemovesManagerFromMemory(boolean disablePreload) + throws Exception { + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( + SchemaUtils.toZNRecord(getSchema())); + when(propertyStore.get("/CONFIGS/TABLE/dimBaseballTeams_OFFLINE", null, AccessOption.PERSISTENT)).thenReturn( + TableConfigUtils.toZNRecord(getTableConfig(disablePreload, false))); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + DimensionTableDataManager tableDataManager = makeTableDataManager(helixManager); + + tableDataManager.addSegment(ImmutableSegmentLoader.load(_indexDir, _indexLoadingConfig, + SEGMENT_OPERATIONS_THROTTLER)); + + tableDataManager.shutDown(); + + Assert.assertNull(DimensionTableDataManager.getInstanceByTableName(tableDataManager.getTableName())); + } + @Test public void testLookupErrorOnDuplicatePrimaryKey() throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java new file mode 100644 index 0000000000..945f2acbd7 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DimensionTableIntegrationTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.spi.config.table.DimensionTableConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class DimensionTableIntegrationTest extends BaseClusterIntegrationTest { + + protected static final Logger LOGGER = LoggerFactory.getLogger(DimensionTableIntegrationTest.class); + private static final String LONG_COL = "longCol"; + private static final String INT_COL = "intCol"; + + @Test + public void testDelete() + throws Exception { + JsonNode node = postQuery("select count(*) from " + getTableName()); + assertNoError(node); + Assert.assertEquals(node.get("resultTable").get("rows").get(0).get(0).asInt(), getCountStarResult()); + + dropOfflineTable(getTableName(), "-1d"); + + waitForEVToDisappear(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())); + + Assert.assertNull( + DimensionTableDataManager.getInstanceByTableName(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()))); + } + + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder() + .setSchemaName(getTableName()) + .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG) + .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT) + .setPrimaryKeyColumns(Collections.singletonList(LONG_COL)) + .build(); + } + + List<File> createAvroFiles() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(ImmutableList.of( + new org.apache.avro.Schema.Field(LONG_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + null, null), + new org.apache.avro.Schema.Field(INT_COL, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), + null, null))); + + ArrayList<File> files = new ArrayList<>(); + + for (int fi = 0; fi < 2; fi++) { + File file = new File(_tempDir, "data" + fi + ".avro"); + try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + writer.create(avroSchema, file); + for (int i = 0; i < getCountStarResult() / 2; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(LONG_COL, i); + record.put(INT_COL, i); + writer.append(record); + } + } + files.add(file); + } + return files; + } + + @Override + protected long getCountStarResult() { + return 1000; + } + + @BeforeClass + public void setUp() + throws Exception { + LOGGER.warn("Setting up integration test class: {}", getClass().getSimpleName()); + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + startZk(); + startController(); + startBroker(); + startServer(); + + if (_controllerRequestURLBuilder == null) { + _controllerRequestURLBuilder = + ControllerRequestURLBuilder.baseUrl("http://localhost:" + getControllerPort()); + } + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + Schema schema = createSchema(); + addSchema(schema); + + List<File> avroFiles = createAvroFiles(); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + waitForAllDocsLoaded(60_000); + LOGGER.warn("Finished setting up integration test class: {}", getClass().getSimpleName()); + } + + @AfterClass + public void tearDown() + throws Exception { + LOGGER.warn("Tearing down integration test class: {}", getClass().getSimpleName()); + FileUtils.deleteDirectory(_tempDir); + + stopServer(); + stopBroker(); + stopController(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + LOGGER.warn("Finished tearing down integration test class: {}", getClass().getSimpleName()); + } + + @Override + public TableConfig createOfflineTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(getTableName()) + .setDimensionTableConfig(new DimensionTableConfig(false, false)) + .setIsDimTable(true) + .build(); + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java new file mode 100644 index 0000000000..6dd6143009 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixManager; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.utils.SchemaUtils; +import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.queries.BaseQueriesTest; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.SegmentAllIndexPreprocessThrottler; +import org.apache.pinot.segment.local.utils.SegmentDownloadThrottler; +import org.apache.pinot.segment.local.utils.SegmentLocks; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; +import org.apache.pinot.segment.local.utils.SegmentStarTreePreprocessThrottler; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.DimensionTableConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.mockito.Mockito; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +// Tests initialization of dimension tables +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Benchmark) +public class BenchmarkDimensionTableOverhead extends BaseQueriesTest { + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = new OptionsBuilder() + .include(BenchmarkDimensionTableOverhead.class.getSimpleName()) + .shouldDoGC(true); + new Runner(opt.build()).run(); + } + + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "FilteredAggregationsTest"); + private static final String TABLE_NAME = "MyTable"; + private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d"; + private static final String INT_COL_NAME = "INT_COL"; + private static final String SORTED_COL_NAME = "SORTED_COL"; + private static final String RAW_INT_COL_NAME = "RAW_INT_COL"; + private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL"; + private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL"; + private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL"; + private static final String LOW_CARDINALITY_STRING_COL = "LOW_CARDINALITY_STRING_COL"; + private static final String TIMESTAMP_COL = "TSTMP_COL"; + private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .setSchemaName(TABLE_NAME) + .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT) + .addSingleValueDimension(RAW_STRING_COL_NAME, FieldSpec.DataType.STRING) + .addSingleValueDimension(NO_INDEX_STRING_COL, FieldSpec.DataType.STRING) + .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, FieldSpec.DataType.STRING) + .addSingleValueDimension(TIMESTAMP_COL, FieldSpec.DataType.TIMESTAMP) + .setPrimaryKeyColumns(Arrays.asList(SORTED_COL_NAME, RAW_STRING_COL_NAME, NO_INDEX_STRING_COL, RAW_INT_COL_NAME)) + .build(); + + private static final SegmentOperationsThrottler SEGMENT_OPERATIONS_THROTTLER = new SegmentOperationsThrottler( + new SegmentAllIndexPreprocessThrottler(1, 2, true), + new SegmentStarTreePreprocessThrottler(1, 2, true), + new SegmentDownloadThrottler(1, 2, true)); + + @Param({"1"}) + private int _numSegments; + + @Param("3000000") + private int _numRows; + + @Param({"true", "false"}) + boolean _disablePreload; + + private static int _iteration = 0; + + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + private Distribution.DataSupplier _supplier; + private DimensionTableDataManager _tableDataManager; + + @Setup(Level.Iteration) + public void setUp() + throws Exception { + _supplier = Distribution.createSupplier(42, "EXP(0.001)"); + FileUtils.deleteQuietly(INDEX_DIR); + + _indexSegments = new ArrayList<>(); + TableConfig tableConfig = getTableConfig(false); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA); + for (int i = 0; i < _numSegments; i++) { + buildSegment(String.format(SEGMENT_NAME_TEMPLATE, i), tableConfig); + _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, String.format(SEGMENT_NAME_TEMPLATE, i)), + indexLoadingConfig)); + } + _indexSegment = _indexSegments.get(0); + + System.gc(); + } + + @Benchmark + public DimensionTableDataManager benchmark() + throws JsonProcessingException { + TableConfig tableConfig = getTableConfig(_disablePreload); + + HelixManager helixManager = Mockito.mock(HelixManager.class); + ZkHelixPropertyStore<ZNRecord> propertyStore = Mockito.mock(ZkHelixPropertyStore.class); + + Mockito.when(propertyStore.get("/SCHEMAS/" + TABLE_NAME, null, AccessOption.PERSISTENT)) + .thenReturn(SchemaUtils.toZNRecord(SCHEMA)); + + Mockito.when(propertyStore.get("/CONFIGS/TABLE/MyTable_OFFLINE", null, AccessOption.PERSISTENT)) + .thenReturn(TableConfigUtils.toZNRecord(tableConfig)); + + Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + + InstanceDataManagerConfig instanceDataManagerConfig = Mockito.mock(InstanceDataManagerConfig.class); + Mockito.when(instanceDataManagerConfig.getInstanceDataDir()) + .thenReturn(INDEX_DIR.getParentFile().getAbsolutePath()); + + String tableName = TABLE_NAME + "_" + _iteration; + _tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableName); + _tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, null, null, + SEGMENT_OPERATIONS_THROTTLER); + _tableDataManager.start(); + + for (int i = 0; i < _indexSegments.size(); i++) { + _tableDataManager.addSegment((ImmutableSegment) _indexSegments.get(i)); + } + + return _tableDataManager; + } + + @TearDown(Level.Iteration) + public void tearDown() { + _tableDataManager.shutDown(); + + for (IndexSegment indexSegment : _indexSegments) { + indexSegment.destroy(); + } + + FileUtils.deleteQuietly(INDEX_DIR); + EXECUTOR_SERVICE.shutdownNow(); + + _iteration++; + } + + private TableConfig getTableConfig(boolean disablePreload) { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(TABLE_NAME) + .setFieldConfigList(FIELD_CONFIGS) + .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME, TIMESTAMP_COL)) + .setSortedColumn(SORTED_COL_NAME) + .setDimensionTableConfig(new DimensionTableConfig(disablePreload, false)) + .build(); + } + + static LazyDataGenerator createTestData(int numRows, Distribution.DataSupplier supplier) { + //create data lazily to prevent OOM and speed up setup + + return new LazyDataGenerator() { + private final Map<Integer, UUID> _strings = new HashMap<>(); + private final String[] _lowCardinalityValues = + IntStream.range(0, 10).mapToObj(i -> "value" + i).toArray(String[]::new); + private Distribution.DataSupplier _supplier = supplier; + + @Override + public int size() { + return numRows; + } + + @Override + public GenericRow next(GenericRow row, int i) { + row.putValue(SORTED_COL_NAME, numRows - i); + row.putValue(INT_COL_NAME, (int) _supplier.getAsLong()); + row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong()); + row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong()); + long rawStrKey = (_supplier.getAsLong()); + row.putValue(RAW_STRING_COL_NAME, + _strings.computeIfAbsent((int) rawStrKey, k -> UUID.randomUUID()).toString()); + row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME)); + row.putValue(LOW_CARDINALITY_STRING_COL, _lowCardinalityValues[i % _lowCardinalityValues.length]); + row.putValue(TIMESTAMP_COL, i * 1200 * 1000L); + + return null; + } + + @Override + public void rewind() { + _strings.clear(); + _supplier.reset(); + } + }; + } + + private void buildSegment(String segmentName, TableConfig tableConfig) + throws Exception { + + LazyDataGenerator dataGenerator = createTestData(_numRows, _supplier); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, SCHEMA); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(segmentName); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GeneratedDataRecordReader(dataGenerator)) { + driver.init(config, recordReader); + driver.build(); + } + + //save generator state so that other segments are not identical to this one + _supplier.snapshot(); + } + + @Override + protected String getFilter() { + return null; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java index 1f43543018..6e07ed66a8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java @@ -190,8 +190,11 @@ public class LookupJoinOperator extends MultiStageOperator { private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) { List<Object[]> container = leftBlock.getContainer(); List<Object[]> rows = new ArrayList<>(container.size()); + PrimaryKey key = new PrimaryKey(new Object[_leftKeyIds.length]); + for (Object[] leftRow : container) { - if (_rightTable.containsKey(getKey(leftRow))) { + fillKey(leftRow, key); + if (_rightTable.containsKey(key)) { rows.add(leftRow); } } @@ -201,8 +204,11 @@ public class LookupJoinOperator extends MultiStageOperator { private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) { List<Object[]> container = leftBlock.getContainer(); List<Object[]> rows = new ArrayList<>(container.size()); + PrimaryKey key = new PrimaryKey(new Object[_leftKeyIds.length]); + for (Object[] leftRow : container) { - if (!_rightTable.containsKey(getKey(leftRow))) { + fillKey(leftRow, key); + if (!_rightTable.containsKey(key)) { rows.add(leftRow); } } @@ -217,6 +223,13 @@ public class LookupJoinOperator extends MultiStageOperator { return new PrimaryKey(values); } + private void fillKey(Object[] row, PrimaryKey key) { + Object[] values = key.getValues(); + for (int i = 0; i < _leftKeyIds.length; i++) { + values[i] = row[_leftKeyIds[i]]; + } + } + private Object[] joinRow(Object[] leftRow, @Nullable Object[] rightRow) { Object[] resultRow = new Object[_resultColumnSize]; System.arraycopy(leftRow, 0, resultRow, 0, leftRow.length); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java index 11e256f886..f1e5834205 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.segment.readers; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,11 @@ public class PinotSegmentRecordReader implements RecordReader { private IndexSegment _indexSegment; private boolean _destroySegmentOnClose; private int _numDocs; + + // These two collection allow for more efficient value reading - via indexes + private ArrayList<String> _columnNames; + private ArrayList<PinotSegmentColumnReader> _columnReaders; + private Map<String, PinotSegmentColumnReader> _columnReaderMap; private int[] _sortedDocIds; private boolean _skipDefaultNullValues; @@ -157,15 +163,23 @@ public class PinotSegmentRecordReader implements RecordReader { if (_numDocs > 0) { _columnReaderMap = new HashMap<>(); + _columnReaders = new ArrayList<>(); + _columnNames = new ArrayList<>(); Set<String> columnsInSegment = _indexSegment.getPhysicalColumnNames(); if (CollectionUtils.isEmpty(fieldsToRead)) { for (String column : columnsInSegment) { - _columnReaderMap.put(column, new PinotSegmentColumnReader(indexSegment, column)); + PinotSegmentColumnReader reader = new PinotSegmentColumnReader(indexSegment, column); + _columnReaderMap.put(column, reader); + _columnNames.add(column); + _columnReaders.add(reader); } } else { for (String column : fieldsToRead) { if (columnsInSegment.contains(column)) { - _columnReaderMap.put(column, new PinotSegmentColumnReader(indexSegment, column)); + PinotSegmentColumnReader reader = new PinotSegmentColumnReader(indexSegment, column); + _columnReaderMap.put(column, reader); + _columnNames.add(column); + _columnReaders.add(reader); } else { LOGGER.warn("Ignoring column: {} that does not exist in the segment", column); } @@ -226,6 +240,33 @@ public class PinotSegmentRecordReader implements RecordReader { } } + public Object[] getRecordValues(int docId, int[] columnIndexes) { + Object[] values = new Object[columnIndexes.length]; + for (int i = 0, n = columnIndexes.length; i < n; i++) { + int columnIndex = columnIndexes[i]; + if (columnIndex > -1) { + PinotSegmentColumnReader columnReader = _columnReaders.get(columnIndex); + if (!columnReader.isNull(docId)) { + values[i] = columnReader.getValue(docId); + } else if (!_skipDefaultNullValues) { + values[i] = columnReader.getValue(docId); + } // else null value is kept + } // else keep null value + } + + return values; + } + + public int[] getIndexesForColumns(List<String> columnNames) { + int[] indexes = new int[columnNames.size()]; + + for (int i = 0, n = columnNames.size(); i < n; i++) { + indexes[i] = _columnNames.indexOf(columnNames.get(i)); + } + + return indexes; + } + // TODO: // - Currently there is no check on column existence // - Null value is not handled (default null value is returned) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 54c7ac9cb4..081647b7d4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -282,7 +282,15 @@ public class ControllerRequestURLBuilder { } public String forTableDelete(String tableName) { - return StringUtil.join("/", _baseUrl, "tables", tableName); + return forTableDelete(tableName, null); + } + + public String forTableDelete(String tableName, String retention) { + String url = StringUtil.join("/", _baseUrl, "tables", tableName); + if (retention != null) { + url += "?retention=" + retention; + } + return url; } public String forTableView(String tableName, String view, @Nullable String tableType) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org