This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b371feb933 extend CompactedPinotSegmentRecordReader so that it can 
skip deleteRecord (#13352)
b371feb933 is described below

commit b371feb933036b92a1400a8dc7b4dc7f2f739ea8
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Mon Jun 10 15:50:36 2024 -0700

    extend CompactedPinotSegmentRecordReader so that it can skip deleteRecord 
(#13352)
    
    * extend CompactedPinotSegmentRecordReader so that it can skip deleteRecord
---
 .../util/ServerSegmentMetadataReader.java          |  6 +--
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 12 +++---
 .../readers/CompactedPinotSegmentRecordReader.java | 23 ++++++++---
 .../CompactedPinotSegmentRecordReaderTest.java     | 47 +++++++++++++++++++++-
 4 files changed, 73 insertions(+), 15 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index dcca712af4..08b8b2d71e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -265,7 +265,7 @@ public class ServerSegmentMetadataReader {
         List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList =
             JsonUtils.stringToObject(validDocIdsMetadataList, new 
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
             });
-        for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: 
validDocIdsMetadataInfoList) {
+        for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo : 
validDocIdsMetadataInfoList) {
           
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), 
validDocIdsMetadataInfo);
         }
         returnedServerRequestsCount++;
@@ -286,8 +286,8 @@ public class ServerSegmentMetadataReader {
     }
 
     if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size() 
!= validDocIdsMetadataInfos.size()) {
-      LOGGER.error("Unable to get validDocIdsMetadata for all segments. 
Expected: {}, Actual: {}",
-          segmentNames.size(), validDocIdsMetadataInfos.size());
+      LOGGER.error("Unable to get validDocIdsMetadata for all segments. 
Expected: {}, Actual: {}", segmentNames.size(),
+          validDocIdsMetadataInfos.size());
     }
 
     LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server 
requests.",
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 25e562283e..a827b9fc45 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -94,8 +94,8 @@ public class MinionTaskUtils {
       String pushMode = IngestionConfigUtils.getPushMode(taskConfigs);
 
       Map<String, String> singleFileGenerationTaskConfig = new 
HashMap<>(taskConfigs);
-      if (pushMode == null
-          || 
pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString()))
 {
+      if (pushMode == null || pushMode.toUpperCase()
+          
.contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) {
         singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
             BatchConfigProperties.SegmentPushType.TAR.toString());
       } else {
@@ -158,7 +158,7 @@ public class MinionTaskUtils {
       ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
       try {
         return 
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
-                validDocIdsType, 60_000);
+            validDocIdsType, 60_000);
       } catch (Exception e) {
         LOGGER.info("Unable to retrieve validDocIdsBitmap for {} from {}", 
segmentName, endpoint);
       }
@@ -167,7 +167,7 @@ public class MinionTaskUtils {
   }
 
   public static List<String> getServers(String segmentName, String 
tableNameWithType, HelixAdmin helixAdmin,
-                                        String clusterName) {
+      String clusterName) {
     ExternalView externalView = 
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
     if (externalView == null) {
       throw new IllegalStateException("External view does not exist for table: 
" + tableNameWithType);
@@ -197,8 +197,8 @@ public class MinionTaskUtils {
     if (tableTaskConfig != null) {
       Map<String, String> configs = 
tableTaskConfig.getConfigsForTaskType(taskType);
       if (configs != null && !configs.isEmpty()) {
-        return 
Boolean.parseBoolean(configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER,
-            String.valueOf(defaultValue)));
+        return Boolean.parseBoolean(
+            
configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, 
String.valueOf(defaultValue)));
       }
     }
     return defaultValue;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
index 07f82d837a..8cab202bf9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.utils.BooleanUtils;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -36,19 +37,25 @@ import org.roaringbitmap.RoaringBitmap;
 public class CompactedPinotSegmentRecordReader implements RecordReader {
   private final PinotSegmentRecordReader _pinotSegmentRecordReader;
   private final RoaringBitmap _validDocIdsBitmap;
-
+  private final String _deleteRecordColumn;
+  // Reusable generic row to store the next row to return
+  private final GenericRow _nextRow = new GenericRow();
   // Valid doc ids iterator
   private PeekableIntIterator _validDocIdsIterator;
-  // Reusable generic row to store the next row to return
-  private GenericRow _nextRow = new GenericRow();
   // Flag to mark whether we need to fetch another row
   private boolean _nextRowReturned = true;
 
   public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap 
validDocIds) {
+    this(indexDir, validDocIds, null);
+  }
+
+  public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap 
validDocIds,
+      @Nullable String deleteRecordColumn) {
     _pinotSegmentRecordReader = new PinotSegmentRecordReader();
     _pinotSegmentRecordReader.init(indexDir, null, null);
     _validDocIdsBitmap = validDocIds;
     _validDocIdsIterator = validDocIds.getIntIterator();
+    _deleteRecordColumn = deleteRecordColumn;
   }
 
   @Override
@@ -67,11 +74,17 @@ public class CompactedPinotSegmentRecordReader implements 
RecordReader {
       return true;
     }
 
-    // Try to get the next row to return
-    if (_validDocIdsIterator.hasNext()) {
+    // Try to get the next row to return, skip invalid docs. If 
_deleteRecordColumn is set, the deleteRecord (i.e.
+    // the tombstone record used to soft-delete old record) is also skipped.
+    // Note that dropping deleteRecord too soon may cause the old soft-deleted 
record to show up unexpectedly, so one
+    // should be careful when to skip the deleteRecord.
+    while (_validDocIdsIterator.hasNext()) {
       int docId = _validDocIdsIterator.next();
       _nextRow.clear();
       _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+      if (_deleteRecordColumn != null && 
BooleanUtils.toBoolean(_nextRow.getValue(_deleteRecordColumn))) {
+        continue;
+      }
       _nextRowReturned = false;
       return true;
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
index b9afc3ce88..e7276211c5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReaderTest.java
@@ -46,6 +46,7 @@ public class CompactedPinotSegmentRecordReaderTest {
   private static final String M1 = "m1";
   private static final String M2 = "m2";
   private static final String TIME = "t";
+  private static final String DELETE_COLUMN = "del_col";
 
   private String _segmentOutputDir;
   private File _segmentIndexDir;
@@ -60,6 +61,10 @@ public class CompactedPinotSegmentRecordReaderTest {
     String segmentName = "compactedPinotSegmentRecordReaderTest";
     _segmentOutputDir = Files.createTempDir().toString();
     _rows = PinotSegmentUtil.createTestData(schema, NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = _rows.get(i);
+      row.putValue(DELETE_COLUMN, i % 2 == 0 ? "true" : "false");
+    }
     _recordReader = new GenericRowRecordReader(_rows);
     _segmentIndexDir =
         PinotSegmentUtil.createSegment(tableConfig, schema, segmentName, 
_segmentOutputDir, _recordReader);
@@ -67,6 +72,7 @@ public class CompactedPinotSegmentRecordReaderTest {
 
   private Schema createPinotSchema() {
     return new 
Schema.SchemaBuilder().setSchemaName("schema").addSingleValueDimension(D_SV_1, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DELETE_COLUMN, FieldSpec.DataType.STRING)
         .addMultiValueDimension(D_MV_1, 
FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT)
         .addMetric(M2, FieldSpec.DataType.FLOAT)
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.HOURS, TIME), null).build();
@@ -79,7 +85,6 @@ public class CompactedPinotSegmentRecordReaderTest {
   @Test
   public void testCompactedPinotSegmentRecordReader()
       throws Exception {
-
     RoaringBitmap validDocIds = new RoaringBitmap();
     for (int i = 0; i < NUM_ROWS; i += 2) {
       validDocIds.add(i);
@@ -124,6 +129,46 @@ public class CompactedPinotSegmentRecordReaderTest {
     }
   }
 
+  @Test
+  public void testCompactedPinotSegmentRecordReaderWithDeleteColumn()
+      throws Exception {
+    RoaringBitmap validDocIds = new RoaringBitmap();
+    for (int i = 0; i < NUM_ROWS; i += 2) {
+      validDocIds.add(i);
+    }
+    List<GenericRow> evenOutputRows = new ArrayList<>();
+    try (CompactedPinotSegmentRecordReader compactedReader = new 
CompactedPinotSegmentRecordReader(_segmentIndexDir,
+        validDocIds, DELETE_COLUMN)) {
+      while (compactedReader.hasNext()) {
+        evenOutputRows.add(compactedReader.next());
+      }
+    }
+
+    validDocIds = new RoaringBitmap();
+    for (int i = 1; i < NUM_ROWS; i += 2) {
+      validDocIds.add(i);
+    }
+    List<GenericRow> oddOutputRows = new ArrayList<>();
+    try (CompactedPinotSegmentRecordReader compactedReader = new 
CompactedPinotSegmentRecordReader(_segmentIndexDir,
+        validDocIds, DELETE_COLUMN)) {
+      while (compactedReader.hasNext()) {
+        oddOutputRows.add(compactedReader.next());
+      }
+    }
+
+    Assert.assertEquals(evenOutputRows.size(), 0, "All even rows are deleted");
+    Assert.assertEquals(oddOutputRows.size(), NUM_ROWS / 2, "All odd rows are 
kept");
+    for (int i = 0; i < oddOutputRows.size(); i++) {
+      GenericRow outputRow = oddOutputRows.get(i);
+      GenericRow row = _rows.get(i * 2 + 1);
+      Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+      
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
 row.getValue(D_MV_1)));
+      Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+      Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+      Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+    }
+  }
+
   @AfterClass
   public void cleanup() {
     FileUtils.deleteQuietly(new File(_segmentOutputDir));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to