This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e825e13978 Revert "support deleteColumn for compactionTask by extending the record reader (#13342)" (#13351) e825e13978 is described below commit e825e139787bc3f6e460f4b7cb1b390e8fd1dfc1 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Mon Jun 10 14:33:41 2024 -0700 Revert "support deleteColumn for compactionTask by extending the record reader (#13342)" (#13351) This reverts commit caf25238f4166a8b5fbddaab64dc3df9a99a6275. --- .../util/ServerSegmentMetadataReader.java | 6 +-- .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 12 +++--- .../UpsertCompactionTaskExecutor.java | 2 +- .../readers/CompactedPinotSegmentRecordReader.java | 20 +++------ .../CompactedPinotSegmentRecordReaderTest.java | 47 +--------------------- 5 files changed, 16 insertions(+), 71 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 08b8b2d71e..dcca712af4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -265,7 +265,7 @@ public class ServerSegmentMetadataReader { List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList = JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() { }); - for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : validDocIdsMetadataInfoList) { + for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: validDocIdsMetadataInfoList) { validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo); } returnedServerRequestsCount++; @@ -286,8 +286,8 @@ public class ServerSegmentMetadataReader { } if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size() != validDocIdsMetadataInfos.size()) { - LOGGER.error("Unable to get validDocIdsMetadata for all segments. Expected: {}, Actual: {}", segmentNames.size(), - validDocIdsMetadataInfos.size()); + LOGGER.error("Unable to get validDocIdsMetadata for all segments. Expected: {}, Actual: {}", + segmentNames.size(), validDocIdsMetadataInfos.size()); } LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.", diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index a827b9fc45..25e562283e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -94,8 +94,8 @@ public class MinionTaskUtils { String pushMode = IngestionConfigUtils.getPushMode(taskConfigs); Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(taskConfigs); - if (pushMode == null || pushMode.toUpperCase() - .contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) { + if (pushMode == null + || pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) { singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.toString()); } else { @@ -158,7 +158,7 @@ public class MinionTaskUtils { ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(); try { return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint, - validDocIdsType, 60_000); + validDocIdsType, 60_000); } catch (Exception e) { LOGGER.info("Unable to retrieve validDocIdsBitmap for {} from {}", segmentName, endpoint); } @@ -167,7 +167,7 @@ public class MinionTaskUtils { } public static List<String> getServers(String segmentName, String tableNameWithType, HelixAdmin helixAdmin, - String clusterName) { + String clusterName) { ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); if (externalView == null) { throw new IllegalStateException("External view does not exist for table: " + tableNameWithType); @@ -197,8 +197,8 @@ public class MinionTaskUtils { if (tableTaskConfig != null) { Map<String, String> configs = tableTaskConfig.getConfigsForTaskType(taskType); if (configs != null && !configs.isEmpty()) { - return Boolean.parseBoolean( - configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, String.valueOf(defaultValue))); + return Boolean.parseBoolean(configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, + String.valueOf(defaultValue))); } } return defaultValue; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java index 7918be4e9d..ec5cc127d9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -99,7 +99,7 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe } try (CompactedPinotSegmentRecordReader compactedRecordReader = new CompactedPinotSegmentRecordReader(indexDir, - validDocIds, tableConfig.getUpsertDeleteRecordColumn())) { + validDocIds)) { SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName, getSchema(tableNameWithType)); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java index dc7b8b5c9e..07f82d837a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java @@ -26,7 +26,6 @@ import javax.annotation.Nullable; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; -import org.apache.pinot.spi.utils.BooleanUtils; import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.RoaringBitmap; @@ -37,25 +36,19 @@ import org.roaringbitmap.RoaringBitmap; public class CompactedPinotSegmentRecordReader implements RecordReader { private final PinotSegmentRecordReader _pinotSegmentRecordReader; private final RoaringBitmap _validDocIdsBitmap; - private final String _deleteRecordColumn; - // Reusable generic row to store the next row to return - private final GenericRow _nextRow = new GenericRow(); + // Valid doc ids iterator private PeekableIntIterator _validDocIdsIterator; + // Reusable generic row to store the next row to return + private GenericRow _nextRow = new GenericRow(); // Flag to mark whether we need to fetch another row private boolean _nextRowReturned = true; public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap validDocIds) { - this(indexDir, validDocIds, null); - } - - public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap validDocIds, - @Nullable String deleteRecordColumn) { _pinotSegmentRecordReader = new PinotSegmentRecordReader(); _pinotSegmentRecordReader.init(indexDir, null, null); _validDocIdsBitmap = validDocIds; _validDocIdsIterator = validDocIds.getIntIterator(); - _deleteRecordColumn = deleteRecordColumn; } @Override @@ -74,14 +67,11 @@ public class CompactedPinotSegmentRecordReader implements RecordReader { return true; } - // Try to get the next row to return, skip invalid doc and deleted doc. - while (_validDocIdsIterator.hasNext()) { + // Try to get the next row to return + if (_validDocIdsIterator.hasNext()) { int docId = _validDocIdsIterator.next(); _nextRow.clear(); _pinotSegmentRecordReader.getRecord(docId, _nextRow); - if (_deleteRecordColumn != null && BooleanUtils.toBoolean(_nextRow.getValue(_deleteRecordColumn))) { - continue; - } _nextRowReturned = false; return true; } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java index e7276211c5..b9afc3ce88 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java @@ -46,7 +46,6 @@ public class CompactedPinotSegmentRecordReaderTest { private static final String M1 = "m1"; private static final String M2 = "m2"; private static final String TIME = "t"; - private static final String DELETE_COLUMN = "del_col"; private String _segmentOutputDir; private File _segmentIndexDir; @@ -61,10 +60,6 @@ public class CompactedPinotSegmentRecordReaderTest { String segmentName = "compactedPinotSegmentRecordReaderTest"; _segmentOutputDir = Files.createTempDir().toString(); _rows = PinotSegmentUtil.createTestData(schema, NUM_ROWS); - for (int i = 0; i < NUM_ROWS; i++) { - GenericRow row = _rows.get(i); - row.putValue(DELETE_COLUMN, i % 2 == 0 ? "true" : "false"); - } _recordReader = new GenericRowRecordReader(_rows); _segmentIndexDir = PinotSegmentUtil.createSegment(tableConfig, schema, segmentName, _segmentOutputDir, _recordReader); @@ -72,7 +67,6 @@ public class CompactedPinotSegmentRecordReaderTest { private Schema createPinotSchema() { return new Schema.SchemaBuilder().setSchemaName("schema").addSingleValueDimension(D_SV_1, FieldSpec.DataType.STRING) - .addSingleValueDimension(DELETE_COLUMN, FieldSpec.DataType.STRING) .addMultiValueDimension(D_MV_1, FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT) .addMetric(M2, FieldSpec.DataType.FLOAT) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, TIME), null).build(); @@ -85,6 +79,7 @@ public class CompactedPinotSegmentRecordReaderTest { @Test public void testCompactedPinotSegmentRecordReader() throws Exception { + RoaringBitmap validDocIds = new RoaringBitmap(); for (int i = 0; i < NUM_ROWS; i += 2) { validDocIds.add(i); @@ -129,46 +124,6 @@ public class CompactedPinotSegmentRecordReaderTest { } } - @Test - public void testCompactedPinotSegmentRecordReaderWithDeleteColumn() - throws Exception { - RoaringBitmap validDocIds = new RoaringBitmap(); - for (int i = 0; i < NUM_ROWS; i += 2) { - validDocIds.add(i); - } - List<GenericRow> evenOutputRows = new ArrayList<>(); - try (CompactedPinotSegmentRecordReader compactedReader = new CompactedPinotSegmentRecordReader(_segmentIndexDir, - validDocIds, DELETE_COLUMN)) { - while (compactedReader.hasNext()) { - evenOutputRows.add(compactedReader.next()); - } - } - - validDocIds = new RoaringBitmap(); - for (int i = 1; i < NUM_ROWS; i += 2) { - validDocIds.add(i); - } - List<GenericRow> oddOutputRows = new ArrayList<>(); - try (CompactedPinotSegmentRecordReader compactedReader = new CompactedPinotSegmentRecordReader(_segmentIndexDir, - validDocIds, DELETE_COLUMN)) { - while (compactedReader.hasNext()) { - oddOutputRows.add(compactedReader.next()); - } - } - - Assert.assertEquals(evenOutputRows.size(), 0, "All even rows are deleted"); - Assert.assertEquals(oddOutputRows.size(), NUM_ROWS / 2, "All odd rows are kept"); - for (int i = 0; i < oddOutputRows.size(); i++) { - GenericRow outputRow = oddOutputRows.get(i); - GenericRow row = _rows.get(i * 2 + 1); - Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1)); - Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1), row.getValue(D_MV_1))); - Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1)); - Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2)); - Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME)); - } - } - @AfterClass public void cleanup() { FileUtils.deleteQuietly(new File(_segmentOutputDir)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org