This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b371feb933 extend CompactedPinotSegmentRecordReader so that it can skip deleteRecord (#13352) b371feb933 is described below commit b371feb933036b92a1400a8dc7b4dc7f2f739ea8 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Mon Jun 10 15:50:36 2024 -0700 extend CompactedPinotSegmentRecordReader so that it can skip deleteRecord (#13352) * extend CompactedPinotSegmentRecordReader so that it can skip deleteRecord --- .../util/ServerSegmentMetadataReader.java | 6 +-- .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 12 +++--- .../readers/CompactedPinotSegmentRecordReader.java | 23 ++++++++--- .../CompactedPinotSegmentRecordReaderTest.java | 47 +++++++++++++++++++++- 4 files changed, 73 insertions(+), 15 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index dcca712af4..08b8b2d71e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -265,7 +265,7 @@ public class ServerSegmentMetadataReader { List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList = JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() { }); - for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: validDocIdsMetadataInfoList) { + for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) { validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo); } returnedServerRequestsCount++; @@ -286,8 +286,8 @@ public class ServerSegmentMetadataReader { } if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size() != validDocIdsMetadataInfos.size()) { - LOGGER.error("Unable to get validDocIdsMetadata for all segments. Expected: {}, Actual: {}", - segmentNames.size(), validDocIdsMetadataInfos.size()); + LOGGER.error("Unable to get validDocIdsMetadata for all segments. Expected: {}, Actual: {}", segmentNames.size(), + validDocIdsMetadataInfos.size()); } LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.", diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 25e562283e..a827b9fc45 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -94,8 +94,8 @@ public class MinionTaskUtils { String pushMode = IngestionConfigUtils.getPushMode(taskConfigs); Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(taskConfigs); - if (pushMode == null - || pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) { + if (pushMode == null || pushMode.toUpperCase() + .contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) { singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.toString()); } else { @@ -158,7 +158,7 @@ public class MinionTaskUtils { ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(); try { return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint, - validDocIdsType, 60_000); + validDocIdsType, 60_000); } catch (Exception e) { LOGGER.info("Unable to retrieve validDocIdsBitmap for {} from {}", segmentName, endpoint); } @@ -167,7 +167,7 @@ public class MinionTaskUtils { } public static List<String> getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin, - String clusterName) { + String clusterName) { ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); if (externalView == null) { throw new IllegalStateException("External view does not exist for table: " + tableNameWithType); @@ -197,8 +197,8 @@ public class MinionTaskUtils { if (tableTaskConfig != null) { Map<String, String> configs = tableTaskConfig.getConfigsForTaskType(taskType); if (configs != null && !configs.isEmpty()) { - return Boolean.parseBoolean(configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, - String.valueOf(defaultValue))); + return Boolean.parseBoolean( + configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, String.valueOf(defaultValue))); } } return defaultValue; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java index 07f82d837a..8cab202bf9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.utils.BooleanUtils; import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.RoaringBitmap; @@ -36,19 +37,25 @@ import org.roaringbitmap.RoaringBitmap; public class CompactedPinotSegmentRecordReader implements RecordReader { private final PinotSegmentRecordReader _pinotSegmentRecordReader; private final RoaringBitmap _validDocIdsBitmap; - + private final String _deleteRecordColumn; + // Reusable generic row to store the next row to return + private final GenericRow _nextRow = new GenericRow(); // Valid doc ids iterator private PeekableIntIterator _validDocIdsIterator; - // Reusable generic row to store the next row to return - private GenericRow _nextRow = new GenericRow(); // Flag to mark whether we need to fetch another row private boolean _nextRowReturned = true; public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap validDocIds) { + this(indexDir, validDocIds, null); + } + + public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap validDocIds, + @Nullable String deleteRecordColumn) { _pinotSegmentRecordReader = new PinotSegmentRecordReader(); _pinotSegmentRecordReader.init(indexDir, null, null); _validDocIdsBitmap = validDocIds; _validDocIdsIterator = validDocIds.getIntIterator(); + _deleteRecordColumn = deleteRecordColumn; } @Override @@ -67,11 +74,17 @@ public class CompactedPinotSegmentRecordReader implements RecordReader { return true; } - // Try to get the next row to return - if (_validDocIdsIterator.hasNext()) { + // Try to get the next row to return, skip invalid docs. If _deleteRecordColumn is set, the deleteRecord (i.e. + // the tombstone record used to soft-delete old record) is also skipped. + // Note that dropping deleteRecord too soon may cause the old soft-deleted record to show up unexpectedly, so one + // should be careful when to skip the deleteRecord. + while (_validDocIdsIterator.hasNext()) { int docId = _validDocIdsIterator.next(); _nextRow.clear(); _pinotSegmentRecordReader.getRecord(docId, _nextRow); + if (_deleteRecordColumn != null && BooleanUtils.toBoolean(_nextRow.getValue(_deleteRecordColumn))) { + continue; + } _nextRowReturned = false; return true; } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java index b9afc3ce88..e7276211c5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java @@ -46,6 +46,7 @@ public class CompactedPinotSegmentRecordReaderTest { private static final String M1 = "m1"; private static final String M2 = "m2"; private static final String TIME = "t"; + private static final String DELETE_COLUMN = "del_col"; private String _segmentOutputDir; private File _segmentIndexDir; @@ -60,6 +61,10 @@ public class CompactedPinotSegmentRecordReaderTest { String segmentName = "compactedPinotSegmentRecordReaderTest"; _segmentOutputDir = Files.createTempDir().toString(); _rows = PinotSegmentUtil.createTestData(schema, NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = _rows.get(i); + row.putValue(DELETE_COLUMN, i % 2 == 0 ? "true" : "false"); + } _recordReader = new GenericRowRecordReader(_rows); _segmentIndexDir = PinotSegmentUtil.createSegment(tableConfig, schema, segmentName, _segmentOutputDir, _recordReader); @@ -67,6 +72,7 @@ public class CompactedPinotSegmentRecordReaderTest { private Schema createPinotSchema() { return new Schema.SchemaBuilder().setSchemaName("schema").addSingleValueDimension(D_SV_1, FieldSpec.DataType.STRING) + .addSingleValueDimension(DELETE_COLUMN, FieldSpec.DataType.STRING) .addMultiValueDimension(D_MV_1, FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT) .addMetric(M2, FieldSpec.DataType.FLOAT) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, TIME), null).build(); @@ -79,7 +85,6 @@ public class CompactedPinotSegmentRecordReaderTest { @Test public void testCompactedPinotSegmentRecordReader() throws Exception { - RoaringBitmap validDocIds = new RoaringBitmap(); for (int i = 0; i < NUM_ROWS; i += 2) { validDocIds.add(i); @@ -124,6 +129,46 @@ public class CompactedPinotSegmentRecordReaderTest { } } + @Test + public void testCompactedPinotSegmentRecordReaderWithDeleteColumn() + throws Exception { + RoaringBitmap validDocIds = new RoaringBitmap(); + for (int i = 0; i < NUM_ROWS; i += 2) { + validDocIds.add(i); + } + List<GenericRow> evenOutputRows = new ArrayList<>(); + try (CompactedPinotSegmentRecordReader compactedReader = new CompactedPinotSegmentRecordReader(_segmentIndexDir, + validDocIds, DELETE_COLUMN)) { + while (compactedReader.hasNext()) { + evenOutputRows.add(compactedReader.next()); + } + } + + validDocIds = new RoaringBitmap(); + for (int i = 1; i < NUM_ROWS; i += 2) { + validDocIds.add(i); + } + List<GenericRow> oddOutputRows = new ArrayList<>(); + try (CompactedPinotSegmentRecordReader compactedReader = new CompactedPinotSegmentRecordReader(_segmentIndexDir, + validDocIds, DELETE_COLUMN)) { + while (compactedReader.hasNext()) { + oddOutputRows.add(compactedReader.next()); + } + } + + Assert.assertEquals(evenOutputRows.size(), 0, "All even rows are deleted"); + Assert.assertEquals(oddOutputRows.size(), NUM_ROWS / 2, "All odd rows are kept"); + for (int i = 0; i < oddOutputRows.size(); i++) { + GenericRow outputRow = oddOutputRows.get(i); + GenericRow row = _rows.get(i * 2 + 1); + Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1)); + Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1), row.getValue(D_MV_1))); + Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1)); + Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2)); + Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME)); + } + } + @AfterClass public void cleanup() { FileUtils.deleteQuietly(new File(_segmentOutputDir)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org