Copilot commented on code in PR #17503:
URL: https://github.com/apache/pinot/pull/17503#discussion_r2922296659


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -192,8 +213,62 @@ protected void removeSegment(IndexSegment segment, 
Iterator<PrimaryKey> primaryK
       
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
           (pk, recordLocation) -> {
             if (recordLocation.getSegment() == segment) {
+              if (_context.isTableTypeInconsistentDuringConsumption() && 
segment instanceof MutableSegment) {
+                _previousKeyToRecordLocationMap.remove(pk);
+              }
+              return null;
+            }
+            return recordLocation;
+          });
+    }
+  }
+
+  @Override
+  protected void revertAndRemoveSegment(IndexSegment segment,
+      Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+    while (primaryKeyIterator.hasNext()) {
+      Map.Entry<Integer, PrimaryKey> primaryKeyEntry = 
primaryKeyIterator.next();
+      PrimaryKey primaryKey = primaryKeyEntry.getValue();
+      int docId = primaryKeyEntry.getKey();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+            RecordLocation prevLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+            if (prevLocation == null) {
+              _previousKeyToRecordLocationMap.remove(primaryKey);
               return null;

Review Comment:
   `revertAndRemoveSegment()` looks up/removes entries in 
`_previousKeyToRecordLocationMap` using the un-hashed `PrimaryKey` object 
(`get(primaryKey)` / `remove(primaryKey)`), but this map is keyed by the hashed 
primary-key object used in `_primaryKeyToRecordLocationMap.compute(...)`. As a 
result, `prevLocation` will almost always be null and the code will incorrectly 
remove primary keys from the upsert metadata instead of reverting, and will 
also leak entries in `_previousKeyToRecordLocationMap`. Use the same hashed key 
(`pk`) for get/remove operations.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1795,15 +1795,13 @@ public static void checkForDuplicates(List<String> 
columns) {
    * @param tableConfig the table config to check, may be null
    * @return true if the table has inconsistent state configs, false if 
tableConfig is null or no issues found
    */
-  public static boolean checkForInconsistentStateConfigs(@Nullable TableConfig 
tableConfig) {
+  public static boolean isTableTypeInconsistentDuringConsumption(@Nullable 
TableConfig tableConfig) {
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     if (upsertConfig == null) {
       return false;
     }
-    return tableConfig.getReplication() > 1 && (
-        upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL
-            || (upsertConfig.isDropOutOfOrderRecord()
-            && upsertConfig.getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE));
+    return (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL || 
upsertConfig.isDropOutOfOrderRecord()
+        || upsertConfig.getOutOfOrderRecordColumn() != null);

Review Comment:
   `isTableTypeInconsistentDuringConsumption(@Nullable TableConfig 
tableConfig)` dereferences `tableConfig` without a null check 
(`tableConfig.getUpsertConfig()`), but the method contract/Javadoc says the 
argument may be null. This can cause an immediate NPE in callers that pass null 
table configs. Add an explicit `if (tableConfig == null) { return false; }` 
guard (or tighten the signature/Javadoc).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java:
##########
@@ -271,32 +322,87 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
   }
 
   @Override
-  protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
-    int removedKeys = 0;
-    for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet()) 
{
-      if (entry.getValue().getSegment() == oldSegment) {
-        _primaryKeyToRecordLocationMap.remove(entry.getKey());
-        removeDocId(oldSegment, entry.getValue().getDocId());
-        removedKeys++;
-      }
+  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
+    // as the occurrence of the key in this segment is being removed. We are 
taking a set of unique primary keys
+    // to avoid double counting the same key in the same segment.
+    Set<Object> uniquePrimaryKeys = new HashSet<>();
+    while (primaryKeyIterator.hasNext()) {
+      PrimaryKey primaryKey = primaryKeyIterator.next();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+            if (recordLocation.getSegment() == segment) {
+              if (_context.isTableTypeInconsistentDuringConsumption() && 
segment instanceof MutableSegment) {
+                _previousKeyToRecordLocationMap.remove(pk);
+              }
+              return null;
+            }
+            if (!uniquePrimaryKeys.add(pk)) {
+              return recordLocation;
+            }
+            return new RecordLocation(recordLocation.getSegment(), 
recordLocation.getDocId(),
+                recordLocation.getComparisonValue(),
+                
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+          });
     }
-    _logger.info("Removed newly added {} keys for the segment: {} out of : 
{}", removedKeys,
-        oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
   }
 
   @Override
-  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+  protected void revertAndRemoveSegment(IndexSegment segment,
+      Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
     // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
     // as the occurrence of the key in this segment is being removed. We are 
taking a set of unique primary keys
     // to avoid double counting the same key in the same segment.
     Set<Object> uniquePrimaryKeys = new HashSet<>();
     while (primaryKeyIterator.hasNext()) {
-      PrimaryKey primaryKey = primaryKeyIterator.next();
+      Map.Entry<Integer, PrimaryKey> primaryKeyEntry = 
primaryKeyIterator.next();
+      PrimaryKey primaryKey = primaryKeyEntry.getValue();
+      int docId = primaryKeyEntry.getKey();
       
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
           (pk, recordLocation) -> {
-            if (recordLocation.getSegment() == segment) {
+            RecordLocation prevLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+            if (prevLocation == null) {
+              _previousKeyToRecordLocationMap.remove(primaryKey);
               return null;

Review Comment:
   `revertAndRemoveSegment()` accesses `_previousKeyToRecordLocationMap` with 
the `PrimaryKey` instance (`get(primaryKey)` / `remove(primaryKey)`), but the 
map is populated with the hashed key object from 
`_primaryKeyToRecordLocationMap.compute(...)`. This key-type mismatch means 
`prevLocation` will typically be null and the code will drop keys instead of 
reverting metadata (and can leave stale entries in 
`_previousKeyToRecordLocationMap`). Use the hashed key (`pk`) consistently for 
lookups/removals.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2773,21 +2774,24 @@ public PauseState resumeTopicsConsumption(String 
tableNameWithType, List<Integer
    * Validates that force commit is allowed for the given table.
    * Throws IllegalStateException if force commit is disabled for 
partial-upsert tables
    * or upsert tables with dropOutOfOrder enabled when replication > 1.
+   * Force commit is always allowed for tables without inconsistent state 
configs.
    */
   private void validateForceCommitAllowed(String tableNameWithType) {
     TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
       throw new IllegalStateException("Table config not found for table: " + 
tableNameWithType);
     }
-    UpsertInconsistentStateConfig configInstance = 
UpsertInconsistentStateConfig.getInstance();
-    if (!configInstance.isForceCommitReloadAllowed(tableConfig)) {
-      throw new IllegalStateException(
-          "Force commit disabled for table: " + tableNameWithType
-              + ". Table is configured as partial upsert or 
dropOutOfOrderRecord=true with replication > 1, "
-              + "which can cause data inconsistency during force commit. "
-              + "Current cluster config '" + configInstance.getConfigKey() + 
"' is set to: "
-              + configInstance.isForceCommitReloadEnabled()
-              + ". To enable force commit, set this config to 'true'.");
+    // Only restrict force commit for tables with inconsistent state configs
+    // (partial upsert or dropOutOfOrder tables with replication > 1)
+    boolean isInconsistentMetadataDuringConsumption =
+        TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+    ConsumingSegmentConsistencyModeListener configInstance = 
ConsumingSegmentConsistencyModeListener.getInstance();
+    if (!configInstance.isForceCommitAllowed() && 
isInconsistentMetadataDuringConsumption) {
+      throw new IllegalStateException("Force commit disabled for table: " + 
tableNameWithType
+          + ". Table is configured as partial upsert or 
dropOutOfOrderRecord=true with replication > 1, "
+          + "which can cause data inconsistency during force commit. " + 
"Current cluster config '"
+          + configInstance.getConfigKey() + "' is set to: " + 
configInstance.getConsistencyMode()
+          + ". To enable safer force commit, set 
isTableTypeInconsistentDuringConsumption config to 'PROTECTED'.");

Review Comment:
   The IllegalStateException message suggests setting 
"isTableTypeInconsistentDuringConsumption config" to `PROTECTED`, but 
`isTableTypeInconsistentDuringConsumption` is a method, not a cluster config. 
This makes the guidance misleading; it should instruct users to set the cluster 
config key (`pinot.server.consuming.segment.consistency.mode`, i.e. 
`configInstance.getConfigKey()`) to `PROTECTED`.
   ```suggestion
             + ". To enable safer force commit, set cluster config '" + 
configInstance.getConfigKey()
             + "' to 'PROTECTED'.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to