Copilot commented on code in PR #17458:
URL: https://github.com/apache/pinot/pull/17458#discussion_r2674761721


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton class to manage the configuration for force commit and reload on 
consuming segments
+ * for upsert tables with inconsistent state configurations (partial upsert or 
dropOutOfOrderRecord=true
+ * with consistency mode NONE and replication > 1).
+ *
+ * This configuration is dynamically updatable via ZK cluster config without 
requiring a server restart.
+ */
+public class UpsertInconsistentStateConfig implements 
PinotClusterConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
+  private static final UpsertInconsistentStateConfig INSTANCE = new 
UpsertInconsistentStateConfig();
+
+  private final AtomicBoolean _forceCommitReloadEnabled =
+      new 
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
+
+  private UpsertInconsistentStateConfig() {
+  }
+
+  public static UpsertInconsistentStateConfig getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Checks if force commit/reload is allowed for the given table config.
+   */
+  public boolean isForceCommitReloadAllowed(TableConfig tableConfig) {
+    return (_forceCommitReloadEnabled.get() || 
!TableConfigUtils.checkForInconsistentStateConfigs(tableConfig));
+  }
+
+  /**
+   * Returns the current config key used for this setting.
+   */
+  public String getConfigKey() {
+    return ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG;
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if 
(!changedConfigs.contains(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG))
 {
+      return;
+    }
+
+    String configValue = 
clusterConfigs.get(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG);
+    boolean enabled = (configValue == null)
+        ? ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD
+        : Boolean.parseBoolean(configValue);
+
+    boolean previousValue = _forceCommitReloadEnabled.getAndSet(enabled);
+    if (previousValue != enabled) {
+      LOGGER.info("Updated cluster config: {} from {} to {}", 
ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG,
+          previousValue, enabled);

Review Comment:
   The variable name `enabled` is misleading. This boolean represents whether 
force commit/reload is *allowed*, not whether the feature is *enabled*. 
Consider renaming to `forceCommitReloadAllowed` for clarity.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton class to manage the configuration for force commit and reload on 
consuming segments
+ * for upsert tables with inconsistent state configurations (partial upsert or 
dropOutOfOrderRecord=true
+ * with consistency mode NONE and replication > 1).
+ *
+ * This configuration is dynamically updatable via ZK cluster config without 
requiring a server restart.
+ */
+public class UpsertInconsistentStateConfig implements 
PinotClusterConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
+  private static final UpsertInconsistentStateConfig INSTANCE = new 
UpsertInconsistentStateConfig();
+
+  private final AtomicBoolean _forceCommitReloadEnabled =
+      new 
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
+
+  private UpsertInconsistentStateConfig() {
+  }
+
+  public static UpsertInconsistentStateConfig getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Checks if force commit/reload is allowed for the given table config.
+   */
+  public boolean isForceCommitReloadAllowed(TableConfig tableConfig) {

Review Comment:
   Potential NullPointerException if `tableConfig` is null. The method 
`TableConfigUtils.checkForInconsistentStateConfigs` no longer performs null 
checking and will throw NPE if tableConfig is null. Add null check before 
calling this method.
   ```suggestion
     public boolean isForceCommitReloadAllowed(TableConfig tableConfig) {
       if (tableConfig == null) {
         // When tableConfig is null, rely solely on the cluster-level flag to 
avoid NPE.
         return _forceCommitReloadEnabled.get();
       }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1605,10 +1605,10 @@ public static void checkForDuplicates(List<String> 
columns) {
   }
 
   public static boolean checkForInconsistentStateConfigs(TableConfig 
tableConfig) {
-    return tableConfig != null && tableConfig.getUpsertConfig() != null && 
tableConfig.getReplication() > 1 && (
-        tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL 
|| (
-            tableConfig.getUpsertConfig().isDropOutOfOrderRecord()
-                && tableConfig.getUpsertConfig().getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE));
+    return tableConfig.getUpsertConfig() != null && 
tableConfig.getReplication() > 1 && (

Review Comment:
   Removed null check for `tableConfig` parameter. The method 
`checkForInconsistentStateConfigs` can receive null, which will cause a 
NullPointerException. The previous implementation checked `tableConfig != null` 
before accessing its methods.
   ```suggestion
       return tableConfig != null && tableConfig.getUpsertConfig() != null && 
tableConfig.getReplication() > 1 && (
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2573,17 +2573,20 @@ public PauseState resumeTopicsConsumption(String 
tableNameWithType, List<Integer
 
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
     // For partial-upsert tables or upserts with out-of-order events enabled, 
force-committing
-    // consuming segments is disabled. In some cases (especially when 
replication > 1), the
-    // server that consumed fewer rows was incorrectly selected as the winner, 
causing other
-    // servers to reconsume rows and resulting in inconsistent data when 
previous state must
-    // be referenced for add/update operations.
-    // TODO: Temporarily disabled until a proper fix is implemented.
+    // consuming segments is disabled by default. In some cases (especially 
when replication > 1),
+    // the server that consumed fewer rows was incorrectly selected as the 
winner, causing other
+    // servers to reconsume rows and resulting in inconsistent data.
+    // This behavior can be controlled via cluster config without requiring 
server restart.
     TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
-    if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+    if (tableConfig == null) {
+      throw new IllegalStateException("Table config not found for table: " + 
tableNameWithType);
+    }
+    if 
(!UpsertInconsistentStateConfig.getInstance().isForceCommitReloadAllowed(tableConfig))
 {
       throw new IllegalStateException(
-          "Force commit is not allowed when replication > 1 for partial-upsert 
tables, or for upsert tables"
-              + " when dropOutOfOrder is enabled with consistency mode: " + 
UpsertConfig.ConsistencyMode.NONE
-              + " for the table: " + tableNameWithType);
+          "Force commit disabled for table: " + tableNameWithType
+              + ". Table is configured as partial upsert or 
dropOutOfOrderRecord=true with replication > 1, "
+              + "which can cause data inconsistency during force commit. "
+              + "To override, set cluster config: " + 
UpsertInconsistentStateConfig.getInstance().getConfigKey());

Review Comment:
   The error message states 'Force commit disabled' but doesn't indicate 
whether the config is currently enabled or disabled. Consider adding the 
current config value to help operators understand if they need to enable or 
disable the setting.



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -350,6 +349,8 @@ public void 
testForceCommitWithNonConsumingSegmentsIsIgnored() {
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
     segmentManager._numReplicas = 1;
     segmentManager.makeTableConfig();
+    
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
+        segmentManager._tableConfig);

Review Comment:
   The removed tests (`testForceCommitUpsertWithOutOfOrderTable`, 
`testForceCommitPartialUpsertTableWithMultipleReplicas`, 
`testForceCommitPartialUpsertTableWithNoReplica`) verified important behavior 
for upsert tables with different configurations. Since the behavior is now 
configurable rather than hard-blocked, new tests should be added to verify: (1) 
force commit succeeds when config is enabled, (2) force commit fails when 
config is disabled for inconsistent state tables, and (3) the config change 
listener correctly updates the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to