This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f3d283d4 Allow server level configuration for Upsert metadata class 
(#11851)
21f3d283d4 is described below

commit 21f3d283d42e1b4f5b16f3ef84549eb34b8b7031
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed Jan 17 16:40:47 2024 +0530

    Allow server level configuration for Upsert metadata class (#11851)
    
    * Allow server level configuration for Upsert metadata class
    
    * Fix null pointer exceptions
    
    * Add tests
    
    * Fix tests
    
    * fix tests
    
    * Fix tests
    
    * Move logic to parse upsert configs from instance data manager to table 
upsert factory
    
    * Address review comments
    
    ---------
    
    Co-authored-by: Kartik Khare 
<kharekar...@kartiks-macbook-pro.tail8a064.ts.net>
---
 .../manager/realtime/RealtimeTableDataManager.java |   8 +-
 .../tests/UpsertTableIntegrationTest.java          | 121 ++++++++++++++++-----
 .../UpsertTableSegmentPreloadIntegrationTest.java  |  11 +-
 .../models/DummyTableUpsertMetadataManager.java    | 115 ++++++++++++++++++++
 .../upsert/BaseTableUpsertMetadataManager.java     |   1 +
 .../upsert/TableUpsertMetadataManagerFactory.java  |  28 ++++-
 .../MutableSegmentImplUpsertComparisonColTest.java |   3 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   3 +-
 .../helix/HelixInstanceDataManagerConfig.java      |   7 ++
 .../config/instance/InstanceDataManagerConfig.java |   2 +
 10 files changed, 265 insertions(+), 34 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 60cd58199f..333be09b0c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,7 +207,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
       // NOTE: Set _tableUpsertMetadataManager before initializing it because 
when preloading is enabled, we need to
       //       load segments into it
-      _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig);
+      _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig,
+          
_tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs());
       _tableUpsertMetadataManager.init(tableConfig, schema, this, 
_helixManager, _segmentPreloadExecutor);
     }
 
@@ -697,6 +698,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _instanceId;
   }
 
+  @VisibleForTesting
+  public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+    return _tableUpsertMetadataManager;
+  }
+
   /**
    * Validate a schema against the table config for real-time record 
consumption.
    * Ideally, we should validate these things when schema is added or table is 
created, but either of these
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index e140020a39..634390effc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -18,20 +18,36 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import 
org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -82,8 +98,9 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     addSchema(schema);
 
     Map<String, String> csvDecoderProperties = 
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
-    TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), 
getKafkaTopic(),
-        getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL);
+    TableConfig tableConfig =
+        createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), 
getNumKafkaPartitions(), csvDecoderProperties, null,
+            PRIMARY_KEY_COL);
     addTableConfig(tableConfig);
 
     // Wait for all documents loaded
@@ -136,8 +153,7 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
 
   private Schema createSchema(String schemaFileName)
       throws IOException {
-    InputStream inputStream =
-        
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
+    InputStream inputStream = 
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
     Assert.assertNotNull(inputStream);
     return Schema.fromInputStream(inputStream);
   }
@@ -181,8 +197,9 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     Schema upsertSchema = createSchema();
     upsertSchema.setSchemaName(tableName);
     addSchema(upsertSchema);
-    TableConfig tableConfig = createCSVUpsertTableConfig(tableName, 
kafkaTopicName,
-        getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, 
PRIMARY_KEY_COL);
+    TableConfig tableConfig =
+        createCSVUpsertTableConfig(tableName, kafkaTopicName, 
getNumKafkaPartitions(), csvDecoderProperties,
+            upsertConfig, PRIMARY_KEY_COL);
     addTableConfig(tableConfig);
 
     // Push initial 10 upsert records - 3 pks 100, 101 and 102
@@ -222,9 +239,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
 
     // Validate deleted records
-    rs = getPinotConnection()
-        .execute("SELECT playerId FROM " + tableName
-            + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+    rs = getPinotConnection().execute(
+        "SELECT playerId FROM " + tableName + " WHERE deleted = true 
OPTION(skipUpsert=true)").getResultSet(0);
     Assert.assertEquals(rs.getRowCount(), 2);
     for (int i = 0; i < rs.getRowCount(); i++) {
       String playerId = rs.getString(i, 0);
@@ -245,17 +261,16 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
 
     // Validate: pk is queryable and all columns are overwritten with new value
-    rs = getPinotConnection()
-        .execute("SELECT playerId, name, game FROM " + tableName + " WHERE 
playerId = 100").getResultSet(0);
+    rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + 
tableName + " WHERE playerId = 100")
+        .getResultSet(0);
     Assert.assertEquals(rs.getRowCount(), 1);
     Assert.assertEquals(rs.getInt(0, 0), 100);
     Assert.assertEquals(rs.getString(0, 1), "Zook-New");
     Assert.assertEquals(rs.getString(0, 2), "null");
 
     // Validate: pk lineage still exists
-    rs = getPinotConnection()
-        .execute("SELECT playerId, name FROM " + tableName
-            + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+    rs = getPinotConnection().execute(
+        "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100 
OPTION(skipUpsert=true)").getResultSet(0);
 
     Assert.assertTrue(rs.getRowCount() > 1);
 
@@ -269,8 +284,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     final UpsertConfig upsertConfig = new 
UpsertConfig(UpsertConfig.Mode.PARTIAL);
     upsertConfig.setDeleteRecordColumn(DELETE_COL);
 
-    testDeleteWithPartialUpsert(getKafkaTopic() + 
"-partial-upsert-with-deletes",
-        "gameScoresPartialUpsertWithDelete", upsertConfig);
+    testDeleteWithPartialUpsert(getKafkaTopic() + 
"-partial-upsert-with-deletes", "gameScoresPartialUpsertWithDelete",
+        upsertConfig);
   }
 
   protected void testDeleteWithPartialUpsert(String kafkaTopicName, String 
tableName, UpsertConfig upsertConfig)
@@ -288,8 +303,9 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
     partialUpsertSchema.setSchemaName(tableName);
     addSchema(partialUpsertSchema);
-    TableConfig tableConfig = createCSVUpsertTableConfig(tableName, 
kafkaTopicName,
-        getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, 
PRIMARY_KEY_COL);
+    TableConfig tableConfig =
+        createCSVUpsertTableConfig(tableName, kafkaTopicName, 
getNumKafkaPartitions(), csvDecoderProperties,
+            upsertConfig, PRIMARY_KEY_COL);
     addTableConfig(tableConfig);
 
     // Push initial 10 upsert records - 3 pks 100, 101 and 102
@@ -329,9 +345,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
 
     // Validate deleted records
-    rs = getPinotConnection()
-        .execute("SELECT playerId FROM " + tableName
-            + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+    rs = getPinotConnection().execute(
+        "SELECT playerId FROM " + tableName + " WHERE deleted = true 
OPTION(skipUpsert=true)").getResultSet(0);
     Assert.assertEquals(rs.getRowCount(), 2);
     for (int i = 0; i < rs.getRowCount(); i++) {
       String playerId = rs.getString(i, 0);
@@ -352,22 +367,74 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     }, 100L, 600_000L, "Failed to load all upsert records for 
testDeleteWithFullUpsert");
 
     // Validate: pk is queryable and all columns are overwritten with new value
-    rs = getPinotConnection()
-        .execute("SELECT playerId, name, game FROM " + tableName
-            + " WHERE playerId = 100").getResultSet(0);
+    rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + 
tableName + " WHERE playerId = 100")
+        .getResultSet(0);
     Assert.assertEquals(rs.getRowCount(), 1);
     Assert.assertEquals(rs.getInt(0, 0), 100);
     Assert.assertEquals(rs.getString(0, 1), "Zook");
     Assert.assertEquals(rs.getString(0, 2), "[\"null\"]");
 
     // Validate: pk lineage still exists
-    rs = getPinotConnection()
-        .execute("SELECT playerId, name FROM " + tableName
-            + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+    rs = getPinotConnection().execute(
+        "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100 
OPTION(skipUpsert=true)").getResultSet(0);
 
     Assert.assertTrue(rs.getRowCount() > 1);
 
     // TEARDOWN
     dropRealtimeTable(tableName);
   }
+
+  @Test
+  public void testDefaultMetadataManagerClass()
+      throws Exception {
+    PinotConfiguration config = getServerConf(12345);
+    
config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+            HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+            
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS),
+        DummyTableUpsertMetadataManager.class.getName());
+
+    BaseServerStarter serverStarter = null;
+    try {
+      serverStarter = startOneServer(config);
+      HelixManager helixManager = 
serverStarter.getServerInstance().getHelixManager();
+      InstanceConfig instanceConfig = 
HelixHelper.getInstanceConfig(helixManager, serverStarter.getInstanceId());
+      // updateInstanceTags
+      String tagsString = "DummyTag_REALTIME,DummyTag_OFFLINE";
+      List<String> newTags = Arrays.asList(StringUtils.split(tagsString, ','));
+      
instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(),
 newTags);
+
+      if 
(!_helixDataAccessor.setProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverStarter.getInstanceId()),
+          instanceConfig)) {
+        throw new RuntimeException("Failed to set instance config for 
instance: " + serverStarter.getInstanceId());
+      }
+
+      String dummyTableName = "dummyTable123";
+      Map<String, String> csvDecoderProperties = 
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+
+      TableConfig tableConfig =
+          createCSVUpsertTableConfig(dummyTableName, getKafkaTopic(), 
getNumKafkaPartitions(), csvDecoderProperties,
+              null, PRIMARY_KEY_COL);
+
+      TenantConfig tenantConfig = new 
TenantConfig(TagNameUtils.DEFAULT_TENANT_NAME, "DummyTag", null);
+
+      tableConfig.setTenantConfig(tenantConfig);
+      Schema schema = createSchema();
+      schema.setSchemaName(dummyTableName);
+      addSchema(schema);
+      addTableConfig(tableConfig);
+
+      Thread.sleep(1000L);
+      RealtimeTableDataManager tableDataManager =
+          (RealtimeTableDataManager) 
serverStarter.getServerInstance().getInstanceDataManager()
+              
.getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(dummyTableName));
+      Assert.assertTrue(tableDataManager.getTableUpsertMetadataManager() 
instanceof DummyTableUpsertMetadataManager);
+      dropRealtimeTable(dummyTableName);
+      deleteSchema(dummyTableName);
+      waitForEVToDisappear(dummyTableName);
+    } finally {
+      if (serverStarter != null) {
+        serverStarter.stop();
+      }
+    }
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index b88f5fbdab..f10a58bf72 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.google.common.base.Joiner;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
@@ -26,8 +27,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -87,8 +90,6 @@ public class UpsertTableSegmentPreloadIntegrationTest extends 
BaseClusterIntegra
     TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), 
PRIMARY_KEY_COL, null, getNumKafkaPartitions());
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     assertNotNull(upsertConfig);
-    upsertConfig.setEnableSnapshot(true);
-    upsertConfig.setEnablePreload(true);
     addTableConfig(tableConfig);
 
     // Create and upload segments
@@ -104,6 +105,12 @@ public class UpsertTableSegmentPreloadIntegrationTest 
extends BaseClusterIntegra
   protected void overrideServerConf(PinotConfiguration serverConf) {
     
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
 + ".max.segment.preload.threads",
         "1");
+    
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+        HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+        TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_SNAPSHOT), 
"true");
+    
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+        HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+        TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_PRELOAD), 
"true");
   }
 
   @AfterClass
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
new file mode 100644
index 0000000000..502834a6b6
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.models;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import 
org.apache.pinot.segment.local.upsert.BasePartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.BaseTableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DummyTableUpsertMetadataManager extends 
BaseTableUpsertMetadataManager {
+
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  public DummyTableUpsertMetadataManager() {
+    super();
+  }
+
+  @Override
+  public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager, HelixManager helixManager,
+      @org.jetbrains.annotations.Nullable ExecutorService 
segmentPreloadExecutor) {
+    super.init(tableConfig, schema, tableDataManager, helixManager, 
segmentPreloadExecutor);
+    _tableConfig = tableConfig;
+    _schema = schema;
+  }
+
+  @Override
+  public PartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
+    UpsertContext context = new 
UpsertContext.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+        .setPrimaryKeyColumns(_schema.getPrimaryKeyColumns())
+        
.setComparisonColumns(Collections.singletonList(_tableConfig.getValidationConfig().getTimeColumnName()))
+        .setHashFunction(HashFunction.NONE).setTableIndexDir(new 
File("/tmp/tableIndexDirDummy")).build();
+
+    return new DummyPartitionUpsertMetadataManager("dummy", partitionId, 
context);
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  class DummyPartitionUpsertMetadataManager extends 
BasePartitionUpsertMetadataManager {
+    public DummyPartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, UpsertContext context) {
+      super(tableNameWithType, partitionId, context);
+    }
+
+    @Override
+    protected long getNumPrimaryKeys() {
+      return 0;
+    }
+
+    @Override
+    protected void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+        @org.jetbrains.annotations.Nullable ThreadSafeMutableRoaringBitmap 
queryableDocIds,
+        Iterator<RecordInfo> recordInfoIterator, 
@org.jetbrains.annotations.Nullable IndexSegment oldSegment,
+        @org.jetbrains.annotations.Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
+    }
+
+    @Override
+    protected boolean doAddRecord(MutableSegment segment, RecordInfo 
recordInfo) {
+      return false;
+    }
+
+    @Override
+    protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
+    }
+
+    @Override
+    protected GenericRow doUpdateRecord(GenericRow record, RecordInfo 
recordInfo) {
+      return null;
+    }
+
+    @Override
+    protected void doRemoveExpiredPrimaryKeys() {
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 845930a54c..72f57d4086 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -116,6 +116,7 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
     initCustomVariables();
 
     if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) {
+
       // Preloading the segments with snapshots for fast upsert metadata 
recovery.
       // Note that there is an implicit waiting logic between the thread doing 
the segment preloading here and the
       // other helix threads about to process segment state transitions (e.g. 
taking segments from OFFLINE to ONLINE).
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 1750aaefc9..1ee4a834e1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,9 +19,11 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,20 +33,42 @@ public class TableUpsertMetadataManagerFactory {
   }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
+  public static final String UPSERT_DEFAULT_METADATA_MANAGER_CLASS = 
"default.metadata.manager.class";
+  public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT = 
"default.enable.snapshot";
+  public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = 
"default.enable.preload";
 
-  public static TableUpsertMetadataManager create(TableConfig tableConfig) {
+  public static TableUpsertMetadataManager create(TableConfig tableConfig,
+      @Nullable PinotConfiguration instanceUpsertConfigs) {
     String tableNameWithType = tableConfig.getTableName();
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     Preconditions.checkArgument(upsertConfig != null, "Must provide upsert 
config for table: %s", tableNameWithType);
 
     TableUpsertMetadataManager metadataManager;
     String metadataManagerClass = upsertConfig.getMetadataManagerClass();
+
+    if (instanceUpsertConfigs != null) {
+      if (metadataManagerClass == null) {
+        metadataManagerClass = 
instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
+      }
+      // Server level config honoured only when table level config is not set 
to true
+      if (!upsertConfig.isEnableSnapshot()) {
+        upsertConfig.setEnableSnapshot(
+            
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
 "false")));
+      }
+
+      // Server level config honoured only when table level config is not set 
to true
+      if (!upsertConfig.isEnablePreload()) {
+        upsertConfig.setEnablePreload(
+            
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
 "false")));
+      }
+    }
+
     if (StringUtils.isNotEmpty(metadataManagerClass)) {
       LOGGER.info("Creating TableUpsertMetadataManager with class: {} for 
table: {}", metadataManagerClass,
           tableNameWithType);
       try {
         metadataManager =
-            (TableUpsertMetadataManager) 
Class.forName(metadataManagerClass).getConstructor().newInstance();
+            (TableUpsertMetadataManager) 
Class.forName(metadataManagerClass).newInstance();
       } catch (Exception e) {
         throw new RuntimeException(
             String.format("Caught exception while constructing 
TableUpsertMetadataManager with class: %s for table: %s",
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 38bc66ade5..d3508b7540 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -86,7 +86,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
-    TableUpsertMetadataManager tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(_tableConfig);
+    TableUpsertMetadataManager tableUpsertMetadataManager =
+        TableUpsertMetadataManagerFactory.create(_tableConfig, null);
     tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, 
mock(HelixManager.class),
         mock(ExecutorService.class));
     _partitionUpsertMetadataManager = 
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 702bd47f80..4f14bdd408 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -96,7 +96,8 @@ public class MutableSegmentImplUpsertTest {
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
-    TableUpsertMetadataManager tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(_tableConfig);
+    TableUpsertMetadataManager tableUpsertMetadataManager =
+        TableUpsertMetadataManagerFactory.create(_tableConfig, null);
     tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, 
mock(HelixManager.class),
         mock(ExecutorService.class));
     _partitionUpsertMetadataManager = 
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 9c69e8afd4..c7e15fe106 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -131,6 +131,8 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 
"external.view.dropped.max.wait.ms";
   private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = 
"external.view.dropped.check.interval.ms";
 
+  public static final String PREFIX_OF_CONFIG_OF_UPSERT = "upsert";
+
   private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
   private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
   private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -294,6 +296,11 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
         DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS);
   }
 
+  @Override
+  public PinotConfiguration getUpsertConfigs() {
+    return 
_instanceDataManagerConfiguration.subset(PREFIX_OF_CONFIG_OF_UPSERT);
+  }
+
   @Override
   public String toString() {
     String configString = "";
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index a97f1639e1..6d035890c9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -68,4 +68,6 @@ public interface InstanceDataManagerConfig {
   long getExternalViewDroppedMaxWaitMs();
 
   long getExternalViewDroppedCheckIntervalMs();
+
+  PinotConfiguration getUpsertConfigs();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to