This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bf696ec25 Make upsert metadata manager pluggable (#9186)
6bf696ec25 is described below

commit 6bf696ec25f2571c252c07099b81f9876f839b91
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Aug 12 02:26:46 2022 -0700

    Make upsert metadata manager pluggable (#9186)
---
 .../manager/realtime/RealtimeTableDataManager.java |  26 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |   4 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   2 +-
 .../upsert/BaseTableUpsertMetadataManager.java     |  79 ++++
 ...ncurrentMapPartitionUpsertMetadataManager.java} | 124 ++----
 ...> ConcurrentMapTableUpsertMetadataManager.java} |  36 +-
 .../upsert/PartitionUpsertMetadataManager.java     | 430 +--------------------
 .../local/{utils => upsert}/RecordInfo.java        |   5 +-
 .../local/upsert/TableUpsertMetadataManager.java   |  38 +-
 .../upsert/TableUpsertMetadataManagerFactory.java  |  65 ++++
 .../pinot/segment/local/upsert/UpsertUtils.java    |  76 ++++
 .../dedup/PartitionDedupMetadataManagerTest.java   |   2 +-
 .../MutableSegmentImplUpsertComparisonColTest.java |   8 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   7 +-
 ...rentMapPartitionUpsertMetadataManagerTest.java} |  16 +-
 .../pinot/spi/config/table/UpsertConfig.java       |  24 ++
 16 files changed, 338 insertions(+), 604 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index f51da5195b..af7f366ebd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -57,9 +57,9 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
-import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
 import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -188,26 +188,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
           _tableUpsertMetadataManager);
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
-
-      List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
-      Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
-          "Primary key columns must be configured for upsert");
-
-      String comparisonColumn = upsertConfig.getComparisonColumn();
-      if (comparisonColumn == null) {
-        comparisonColumn = 
tableConfig.getValidationConfig().getTimeColumnName();
-      }
-
-      PartialUpsertHandler partialUpsertHandler = null;
-      if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
-        assert upsertConfig.getPartialUpsertStrategies() != null;
-        partialUpsertHandler = new PartialUpsertHandler(schema, 
upsertConfig.getPartialUpsertStrategies(),
-            upsertConfig.getDefaultPartialUpsertStrategy(), comparisonColumn);
-      }
-
-      _tableUpsertMetadataManager =
-          new TableUpsertMetadataManager(_tableNameWithType, 
primaryKeyColumns, comparisonColumn,
-              upsertConfig.getHashFunction(), partialUpsertHandler, 
_serverMetrics);
+      _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, 
_serverMetrics);
     }
   }
 
@@ -264,7 +245,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   public boolean isPartialUpsertEnabled() {
-    return _tableUpsertMetadataManager != null && 
_tableUpsertMetadataManager.isPartialUpsertEnabled();
+    return _tableUpsertMetadataManager != null
+        && _tableUpsertMetadataManager.getUpsertMode() == 
UpsertConfig.Mode.PARTIAL;
   }
 
   /*
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 4bfb67dfbe..e49b90e657 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -38,7 +38,7 @@ import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import 
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
@@ -125,7 +125,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
-        new PartitionUpsertMetadataManager("testTable_REALTIME", 0, 
Collections.singletonList("column6"),
+        new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 
0, Collections.singletonList("column6"),
             "daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new 
ThreadSafeMutableRoaringBitmap());
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 775fe06f5c..d6de8d0168 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -62,11 +62,11 @@ import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
 import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.local.utils.IdMap;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.MutableSegment;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
new file mode 100644
index 0000000000..95666d3ea2
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+@ThreadSafe
+public abstract class BaseTableUpsertMetadataManager implements 
TableUpsertMetadataManager {
+  protected String _tableNameWithType;
+  protected List<String> _primaryKeyColumns;
+  protected String _comparisonColumn;
+  protected HashFunction _hashFunction;
+  protected PartialUpsertHandler _partialUpsertHandler;
+  protected ServerMetrics _serverMetrics;
+
+  @Override
+  public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager,
+      ServerMetrics serverMetrics) {
+    _tableNameWithType = tableConfig.getTableName();
+
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode() 
!= UpsertConfig.Mode.NONE,
+        "Upsert must be enabled for table: %s", _tableNameWithType);
+
+    _primaryKeyColumns = schema.getPrimaryKeyColumns();
+    Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+        "Primary key columns must be configured for upsert enabled table: %s", 
_tableNameWithType);
+
+    _comparisonColumn = upsertConfig.getComparisonColumn();
+    if (_comparisonColumn == null) {
+      _comparisonColumn = 
tableConfig.getValidationConfig().getTimeColumnName();
+    }
+
+    _hashFunction = upsertConfig.getHashFunction();
+
+    if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+      Map<String, UpsertConfig.Strategy> partialUpsertStrategies = 
upsertConfig.getPartialUpsertStrategies();
+      Preconditions.checkArgument(partialUpsertStrategies != null,
+          "Partial-upsert strategies must be configured for partial-upsert 
enabled table: %s", _tableNameWithType);
+      _partialUpsertHandler =
+          new PartialUpsertHandler(schema, partialUpsertStrategies, 
upsertConfig.getDefaultPartialUpsertStrategy(),
+              _comparisonColumn);
+    }
+
+    _serverMetrics = serverMetrics;
+  }
+
+  @Override
+  public UpsertConfig.Mode getUpsertMode() {
+    return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : 
UpsertConfig.Mode.PARTIAL;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
similarity index 84%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index d1042fe5c1..e6890c8a6c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -38,7 +38,6 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -47,7 +46,6 @@ import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
@@ -55,32 +53,11 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Manages the upsert metadata per partition.
- * <p>For multiple records with the same comparison value (default to 
timestamp), the manager will preserve the latest
- * record based on the sequence number of the segment. If 2 records with the 
same comparison value are in the same
- * segment, the one with larger doc id will be preserved. Note that for tables 
with sorted column, the records will be
- * re-ordered when committing the segment, and we will use the re-ordered doc 
ids instead of the ingestion doc ids to
- * decide the record to preserve.
- *
- * <p>There will be short term inconsistency when updating the upsert 
metadata, but should be consistent after the
- * operation is done:
- * <ul>
- *   <li>
- *     When updating a new record, it first removes the doc id from the 
current location, then update the new location.
- *   </li>
- *   <li>
- *     When adding a new segment, it removes the doc ids from the current 
locations before the segment being added to
- *     the RealtimeTableDataManager.
- *   </li>
- *   <li>
- *     When replacing an existing segment, after the record location being 
replaced with the new segment, the following
- *     updates applied to the new segment's valid doc ids won't be reflected 
to the replaced segment's valid doc ids.
- *   </li>
- * </ul>
+ * Implementation of {@link PartitionUpsertMetadataManager} that is backed by 
a {@link ConcurrentHashMap}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 @ThreadSafe
-public class PartitionUpsertMetadataManager {
+public class ConcurrentMapPartitionUpsertMetadataManager implements 
PartitionUpsertMetadataManager {
   private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(1);
 
   private final String _tableNameWithType;
@@ -105,9 +82,9 @@ public class PartitionUpsertMetadataManager {
   private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
   private int _numOutOfOrderEvents = 0;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, List<String> primaryKeyColumns,
-      String comparisonColumn, HashFunction hashFunction, @Nullable 
PartialUpsertHandler partialUpsertHandler,
-      ServerMetrics serverMetrics) {
+  public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId,
+      List<String> primaryKeyColumns, String comparisonColumn, HashFunction 
hashFunction,
+      @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics 
serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
@@ -118,16 +95,12 @@ public class PartitionUpsertMetadataManager {
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
   }
 
-  /**
-   * Returns the primary key columns.
-   */
+  @Override
   public List<String> getPrimaryKeyColumns() {
     return _primaryKeyColumns;
   }
 
-  /**
-   * Initializes the upsert metadata for the given immutable segment.
-   */
+  @Override
   public void addSegment(ImmutableSegment segment) {
     addSegment(segment, null, null);
   }
@@ -154,7 +127,7 @@ public class PartitionUpsertMetadataManager {
         validDocIds = new ThreadSafeMutableRoaringBitmap();
       }
       if (recordInfoIterator == null) {
-        recordInfoIterator = getRecordInfoIterator(segment);
+        recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, 
_primaryKeyColumns, _comparisonColumn);
       }
       addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
recordInfoIterator, null, null);
     } finally {
@@ -169,42 +142,6 @@ public class PartitionUpsertMetadataManager {
     _logger.info("Finished adding segment: {}, current primary key count: {}", 
segmentName, numPrimaryKeys);
   }
 
-  private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment) 
{
-    int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
-    return new Iterator<RecordInfo>() {
-      private int _docId = 0;
-
-      @Override
-      public boolean hasNext() {
-        return _docId < numTotalDocs;
-      }
-
-      @Override
-      public RecordInfo next() {
-        PrimaryKey primaryKey = new PrimaryKey(new 
Object[_primaryKeyColumns.size()]);
-        getPrimaryKey(segment, _docId, primaryKey);
-
-        Object comparisonValue = segment.getValue(_docId, _comparisonColumn);
-        if (comparisonValue instanceof byte[]) {
-          comparisonValue = new ByteArray((byte[]) comparisonValue);
-        }
-        return new RecordInfo(primaryKey, _docId++, (Comparable) 
comparisonValue);
-      }
-    };
-  }
-
-  private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey 
buffer) {
-    Object[] values = buffer.getValues();
-    int numPrimaryKeyColumns = values.length;
-    for (int i = 0; i < numPrimaryKeyColumns; i++) {
-      Object value = segment.getValue(docId, _primaryKeyColumns.get(i));
-      if (value instanceof byte[]) {
-        value = new ByteArray((byte[]) value);
-      }
-      values[i] = value;
-    }
-  }
-
   private void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment 
oldSegment,
       @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
@@ -293,9 +230,7 @@ public class PartitionUpsertMetadataManager {
     }
   }
 
-  /**
-   * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
-   */
+  @Override
   public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = 
Objects.requireNonNull(segment.getValidDocIds());
     
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
@@ -330,9 +265,7 @@ public class PartitionUpsertMetadataManager {
         _primaryKeyToRecordLocationMap.size());
   }
 
-  /**
-   * Replaces the upsert metadata for the old segment with the new immutable 
segment.
-   */
+  @Override
   public void replaceSegment(ImmutableSegment segment, IndexSegment 
oldSegment) {
     replaceSegment(segment, null, null, oldSegment);
   }
@@ -363,7 +296,7 @@ public class PartitionUpsertMetadataManager {
           validDocIds = new ThreadSafeMutableRoaringBitmap();
         }
         if (recordInfoIterator == null) {
-          recordInfoIterator = getRecordInfoIterator(segment);
+          recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, 
_primaryKeyColumns, _comparisonColumn);
         }
         addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
recordInfoIterator, oldSegment,
             validDocIdsForOldSegment);
@@ -402,9 +335,7 @@ public class PartitionUpsertMetadataManager {
     _logger.info("Finished replacing segment: {}, current primary key count: 
{}", segmentName, numPrimaryKeys);
   }
 
-  /**
-   * Removes the upsert metadata for the given segment.
-   */
+  @Override
   public void removeSegment(IndexSegment segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Removing {} segment: {}, current primary key count: {}",
@@ -446,7 +377,7 @@ public class PartitionUpsertMetadataManager {
     PeekableIntIterator iterator = validDocIds.getIntIterator();
     while (iterator.hasNext()) {
       int docId = iterator.next();
-      getPrimaryKey(segment, docId, primaryKey);
+      UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, 
primaryKey);
       
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
           (pk, recordLocation) -> {
             if (recordLocation.getSegment() == segment) {
@@ -457,9 +388,7 @@ public class PartitionUpsertMetadataManager {
     }
   }
 
-  /**
-   * Returns the merged record when partial-upsert is enabled.
-   */
+  @Override
   public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
     // Directly return the record when partial-upsert is not enabled
     if (_partialUpsertHandler == null) {
@@ -498,4 +427,29 @@ public class PartitionUpsertMetadataManager {
       return record;
     }
   }
+
+  @VisibleForTesting
+  static class RecordLocation {
+    private final IndexSegment _segment;
+    private final int _docId;
+    private final Comparable _comparisonValue;
+
+    public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
comparisonValue) {
+      _segment = indexSegment;
+      _docId = docId;
+      _comparisonValue = comparisonValue;
+    }
+
+    public IndexSegment getSegment() {
+      return _segment;
+    }
+
+    public int getDocId() {
+      return _docId;
+    }
+
+    public Comparable getComparisonValue() {
+      return _comparisonValue;
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
similarity index 51%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 35dff04a75..67474e145d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -18,33 +18,23 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import org.apache.pinot.segment.spi.IndexSegment;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
 
 
 /**
- * Indicate a record's location on the local host.
+ * Implementation of {@link TableUpsertMetadataManager} that is backed by a 
{@link ConcurrentHashMap}.
  */
-public class RecordLocation {
-  private final IndexSegment _segment;
-  private final int _docId;
-  /** value used to denote the order */
-  private final Comparable _comparisonValue;
+@ThreadSafe
+public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMetadataManager {
+  private final Map<Integer, ConcurrentMapPartitionUpsertMetadataManager> 
_partitionMetadataManagerMap =
+      new ConcurrentHashMap<>();
 
-  public RecordLocation(IndexSegment indexSegment, int docId, Comparable 
comparisonValue) {
-    _segment = indexSegment;
-    _docId = docId;
-    _comparisonValue = comparisonValue;
-  }
-
-  public IndexSegment getSegment() {
-    return _segment;
-  }
-
-  public int getDocId() {
-    return _docId;
-  }
-
-  public Comparable getComparisonValue() {
-    return _comparisonValue;
+  @Override
+  public ConcurrentMapPartitionUpsertMetadataManager 
getOrCreatePartitionManager(int partitionId) {
+    return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
+        k -> new 
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, 
_primaryKeyColumns,
+            _comparisonColumn, _hashFunction, _partialUpsertHandler, 
_serverMetrics));
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index d1042fe5c1..2c5f68df45 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -18,40 +18,12 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.ServerGauge;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
-import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.PeekableIntIterator;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -78,424 +50,36 @@ import org.slf4j.LoggerFactory;
  *   </li>
  * </ul>
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
 @ThreadSafe
-public class PartitionUpsertMetadataManager {
-  private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(1);
-
-  private final String _tableNameWithType;
-  private final int _partitionId;
-  private final List<String> _primaryKeyColumns;
-  private final String _comparisonColumn;
-  private final HashFunction _hashFunction;
-  private final PartialUpsertHandler _partialUpsertHandler;
-  private final ServerMetrics _serverMetrics;
-  private final Logger _logger;
-
-  // TODO(upsert): consider an off-heap KV store to persist this mapping to 
improve the recovery speed.
-  @VisibleForTesting
-  final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
-  @VisibleForTesting
-  final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
-
-  // Reused for reading previous record during partial upsert
-  private final GenericRow _reuse = new GenericRow();
-
-  private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
-  private int _numOutOfOrderEvents = 0;
-
-  public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, List<String> primaryKeyColumns,
-      String comparisonColumn, HashFunction hashFunction, @Nullable 
PartialUpsertHandler partialUpsertHandler,
-      ServerMetrics serverMetrics) {
-    _tableNameWithType = tableNameWithType;
-    _partitionId = partitionId;
-    _primaryKeyColumns = primaryKeyColumns;
-    _comparisonColumn = comparisonColumn;
-    _hashFunction = hashFunction;
-    _partialUpsertHandler = partialUpsertHandler;
-    _serverMetrics = serverMetrics;
-    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
-  }
+public interface PartitionUpsertMetadataManager {
 
   /**
    * Returns the primary key columns.
    */
-  public List<String> getPrimaryKeyColumns() {
-    return _primaryKeyColumns;
-  }
+  List<String> getPrimaryKeyColumns();
 
   /**
    * Initializes the upsert metadata for the given immutable segment.
    */
-  public void addSegment(ImmutableSegment segment) {
-    addSegment(segment, null, null);
-  }
-
-  @VisibleForTesting
-  void addSegment(ImmutableSegment segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable Iterator<RecordInfo> recordInfoIterator) {
-    String segmentName = segment.getSegmentName();
-    _logger.info("Adding segment: {}, current primary key count: {}", 
segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    if (segment instanceof EmptyIndexSegment) {
-      _logger.info("Skip adding empty segment: {}", segmentName);
-      return;
-    }
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-          "Got unsupported segment implementation: {} for segment: {}, table: 
{}", segment.getClass(), segmentName,
-          _tableNameWithType);
-      if (validDocIds == null) {
-        validDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (recordInfoIterator == null) {
-        recordInfoIterator = getRecordInfoIterator(segment);
-      }
-      addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
recordInfoIterator, null, null);
-    } finally {
-      segmentLock.unlock();
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished adding segment: {}, current primary key count: {}", 
segmentName, numPrimaryKeys);
-  }
-
-  private Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment) 
{
-    int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
-    return new Iterator<RecordInfo>() {
-      private int _docId = 0;
-
-      @Override
-      public boolean hasNext() {
-        return _docId < numTotalDocs;
-      }
-
-      @Override
-      public RecordInfo next() {
-        PrimaryKey primaryKey = new PrimaryKey(new 
Object[_primaryKeyColumns.size()]);
-        getPrimaryKey(segment, _docId, primaryKey);
-
-        Object comparisonValue = segment.getValue(_docId, _comparisonColumn);
-        if (comparisonValue instanceof byte[]) {
-          comparisonValue = new ByteArray((byte[]) comparisonValue);
-        }
-        return new RecordInfo(primaryKey, _docId++, (Comparable) 
comparisonValue);
-      }
-    };
-  }
-
-  private void getPrimaryKey(IndexSegment segment, int docId, PrimaryKey 
buffer) {
-    Object[] values = buffer.getValues();
-    int numPrimaryKeyColumns = values.length;
-    for (int i = 0; i < numPrimaryKeyColumns; i++) {
-      Object value = segment.getValue(docId, _primaryKeyColumns.get(i));
-      if (value instanceof byte[]) {
-        value = new ByteArray((byte[]) value);
-      }
-      values[i] = value;
-    }
-  }
-
-  private void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
-      Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment 
oldSegment,
-      @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
-    String segmentName = segment.getSegmentName();
-    segment.enableUpsert(this, validDocIds);
-
-    AtomicInteger numKeysInWrongSegment = new AtomicInteger();
-    while (recordInfoIterator.hasNext()) {
-      RecordInfo recordInfo = recordInfoIterator.next();
-      
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
-          (primaryKey, currentRecordLocation) -> {
-            if (currentRecordLocation != null) {
-              // Existing primary key
-              IndexSegment currentSegment = currentRecordLocation.getSegment();
-              int comparisonResult =
-                  
recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue());
-
-              // The current record is in the same segment
-              // Update the record location when there is a tie to keep the 
newer record. Note that the record info
-              // iterator will return records with incremental doc ids.
-              if (currentSegment == segment) {
-                if (comparisonResult >= 0) {
-                  validDocIds.replace(currentRecordLocation.getDocId(), 
recordInfo.getDocId());
-                  return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-                } else {
-                  return currentRecordLocation;
-                }
-              }
-
-              // The current record is in an old segment being replaced
-              // This could happen when committing a consuming segment, or 
reloading a completed segment. In this
-              // case, we want to update the record location when there is a 
tie because the record locations should
-              // point to the new added segment instead of the old segment 
being replaced. Also, do not update the valid
-              // doc ids for the old segment because it has not been replaced 
yet. We pass in an optional valid doc ids
-              // snapshot for the old segment, which can be updated and used 
to track the docs not replaced yet.
-              if (currentSegment == oldSegment) {
-                if (comparisonResult >= 0) {
-                  validDocIds.add(recordInfo.getDocId());
-                  if (validDocIdsForOldSegment != null) {
-                    
validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
-                  }
-                  return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-                } else {
-                  return currentRecordLocation;
-                }
-              }
-
-              // This should not happen because the previously replaced 
segment should have all keys removed. We still
-              // handle it here, and also track the number of keys not 
properly replaced previously.
-              String currentSegmentName = currentSegment.getSegmentName();
-              if (currentSegmentName.equals(segmentName)) {
-                numKeysInWrongSegment.getAndIncrement();
-                if (comparisonResult >= 0) {
-                  validDocIds.add(recordInfo.getDocId());
-                  return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-                } else {
-                  return currentRecordLocation;
-                }
-              }
-
-              // The current record is in a different segment
-              // Update the record location when getting a newer comparison 
value, or the value is the same as the
-              // current value, but the segment has a larger sequence number 
(the segment is newer than the current
-              // segment).
-              if (comparisonResult > 0 || (comparisonResult == 0 && 
LLCSegmentName.isLowLevelConsumerSegmentName(
-                  segmentName) && 
LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
-                  && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName.getSequenceNumber(
-                  currentSegmentName))) {
-                
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
-                validDocIds.add(recordInfo.getDocId());
-                return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-              } else {
-                return currentRecordLocation;
-              }
-            } else {
-              // New primary key
-              validDocIds.add(recordInfo.getDocId());
-              return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-            }
-          });
-    }
-    int numKeys = numKeysInWrongSegment.get();
-    if (numKeys > 0) {
-      _logger.warn("Found {} primary keys in the wrong segment when adding 
segment: {}", numKeys, segmentName);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
-    }
-  }
+  void addSegment(ImmutableSegment segment);
 
   /**
    * Updates the upsert metadata for a new consumed record in the given 
consuming segment.
    */
-  public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
-    ThreadSafeMutableRoaringBitmap validDocIds = 
Objects.requireNonNull(segment.getValidDocIds());
-    
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
-        (primaryKey, currentRecordLocation) -> {
-          if (currentRecordLocation != null) {
-            // Existing primary key
-
-            // Update the record location when the new comparison value is 
greater than or equal to the current value.
-            // Update the record location when there is a tie to keep the 
newer record.
-            if 
(recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue())
 >= 0) {
-              IndexSegment currentSegment = currentRecordLocation.getSegment();
-              int currentDocId = currentRecordLocation.getDocId();
-              if (segment == currentSegment) {
-                validDocIds.replace(currentDocId, recordInfo.getDocId());
-              } else {
-                
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
-                validDocIds.add(recordInfo.getDocId());
-              }
-              return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-            } else {
-              return currentRecordLocation;
-            }
-          } else {
-            // New primary key
-            validDocIds.add(recordInfo.getDocId());
-            return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
-          }
-        });
-
-    // Update metrics
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        _primaryKeyToRecordLocationMap.size());
-  }
+  void addRecord(MutableSegment segment, RecordInfo recordInfo);
 
   /**
    * Replaces the upsert metadata for the old segment with the new immutable 
segment.
    */
-  public void replaceSegment(ImmutableSegment segment, IndexSegment 
oldSegment) {
-    replaceSegment(segment, null, null, oldSegment);
-  }
-
-  @VisibleForTesting
-  void replaceSegment(ImmutableSegment segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment 
oldSegment) {
-    String segmentName = segment.getSegmentName();
-    
Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
-        "Cannot replace segment with different name for table: {}, old 
segment: {}, new segment: {}",
-        _tableNameWithType, oldSegment.getSegmentName(), segmentName);
-    _logger.info("Replacing {} segment: {}, current primary key count: {}",
-        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      MutableRoaringBitmap validDocIdsForOldSegment =
-          oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
-      if (segment instanceof EmptyIndexSegment) {
-        _logger.info("Skip adding empty segment: {}", segmentName);
-      } else {
-        Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-            "Got unsupported segment implementation: {} for segment: {}, 
table: {}", segment.getClass(), segmentName,
-            _tableNameWithType);
-        if (validDocIds == null) {
-          validDocIds = new ThreadSafeMutableRoaringBitmap();
-        }
-        if (recordInfoIterator == null) {
-          recordInfoIterator = getRecordInfoIterator(segment);
-        }
-        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
recordInfoIterator, oldSegment,
-            validDocIdsForOldSegment);
-      }
-
-      if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-        if (_partialUpsertHandler != null) {
-          // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-          // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-          // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-          // different records (some stream consumer cannot guarantee 
consuming the messages in the same order).
-          _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-              + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-              numKeysNotReplaced);
-        } else {
-          _logger.info("Found {} primary keys not replaced when replacing 
segment: {}", numKeysNotReplaced,
-              segmentName);
-        }
-        removeSegment(oldSegment, validDocIdsForOldSegment);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-
-    if (!(oldSegment instanceof EmptyIndexSegment)) {
-      _replacedSegments.add(oldSegment);
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished replacing segment: {}, current primary key count: 
{}", segmentName, numPrimaryKeys);
-  }
+  void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment);
 
   /**
    * Removes the upsert metadata for the given segment.
    */
-  public void removeSegment(IndexSegment segment) {
-    String segmentName = segment.getSegmentName();
-    _logger.info("Removing {} segment: {}, current primary key count: {}",
-        segment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    if (_replacedSegments.remove(segment)) {
-      _logger.info("Skip removing replaced segment: {}", segmentName);
-      return;
-    }
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
-    segmentLock.lock();
-    try {
-      MutableRoaringBitmap validDocIds =
-          segment.getValidDocIds() != null ? 
segment.getValidDocIds().getMutableRoaringBitmap() : null;
-      if (validDocIds == null || validDocIds.isEmpty()) {
-        _logger.info("Skip removing segment without valid docs: {}", 
segmentName);
-        return;
-      }
-
-      _logger.info("Removing {} primary keys for segment: {}", 
validDocIds.getCardinality(), segmentName);
-      removeSegment(segment, validDocIds);
-    } finally {
-      segmentLock.unlock();
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished removing segment: {}, current primary key count: 
{}", segmentName, numPrimaryKeys);
-  }
-
-  private void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
-    assert !validDocIds.isEmpty();
-    PrimaryKey primaryKey = new PrimaryKey(new 
Object[_primaryKeyColumns.size()]);
-    PeekableIntIterator iterator = validDocIds.getIntIterator();
-    while (iterator.hasNext()) {
-      int docId = iterator.next();
-      getPrimaryKey(segment, docId, primaryKey);
-      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
-          (pk, recordLocation) -> {
-            if (recordLocation.getSegment() == segment) {
-              return null;
-            }
-            return recordLocation;
-          });
-    }
-  }
+  void removeSegment(IndexSegment segment);
 
   /**
    * Returns the merged record when partial-upsert is enabled.
    */
-  public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
-    // Directly return the record when partial-upsert is not enabled
-    if (_partialUpsertHandler == null) {
-      return record;
-    }
-
-    AtomicReference<GenericRow> previousRecordReference = new 
AtomicReference<>();
-    RecordLocation currentRecordLocation = 
_primaryKeyToRecordLocationMap.computeIfPresent(
-        HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), 
(pk, recordLocation) -> {
-          if 
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) 
>= 0) {
-            _reuse.clear();
-            
previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(),
 _reuse));
-          }
-          return recordLocation;
-        });
-    if (currentRecordLocation != null) {
-      // Existing primary key
-      GenericRow previousRecord = previousRecordReference.get();
-      if (previousRecord != null) {
-        return _partialUpsertHandler.merge(previousRecord, record);
-      } else {
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
-        _numOutOfOrderEvents++;
-        long currentTimeNs = System.nanoTime();
-        if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > 
OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
-          _logger.warn("Skipped {} out-of-order events for partial-upsert 
table (the last event has current comparison "
-                  + "value: {}, record comparison value: {})", 
_numOutOfOrderEvents,
-              currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
-          _lastOutOfOrderEventReportTimeNs = currentTimeNs;
-          _numOutOfOrderEvents = 0;
-        }
-        return record;
-      }
-    } else {
-      // New primary key
-      return record;
-    }
-  }
+  GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
similarity index 92%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
index 5e235c86e0..f4f139f6c3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/RecordInfo.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.local.utils;
+package org.apache.pinot.segment.local.upsert;
 
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 
 
-public final class RecordInfo {
+@SuppressWarnings("rawtypes")
+public class RecordInfo {
   private final PrimaryKey _primaryKey;
   private final int _docId;
   private final Comparable _comparisonValue;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 108438e95e..ffafb999ee 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -18,45 +18,23 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
 
 
 /**
  * The manager of the upsert metadata of a table.
  */
 @ThreadSafe
-public class TableUpsertMetadataManager {
-  private final Map<Integer, PartitionUpsertMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
-  private final String _tableNameWithType;
-  private final List<String> _primaryKeyColumns;
-  private final String _comparisonColumn;
-  private final HashFunction _hashFunction;
-  private final PartialUpsertHandler _partialUpsertHandler;
-  private final ServerMetrics _serverMetrics;
+public interface TableUpsertMetadataManager {
 
-  public TableUpsertMetadataManager(String tableNameWithType, List<String> 
primaryKeyColumns, String comparisonColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler, ServerMetrics serverMetrics) {
-    _tableNameWithType = tableNameWithType;
-    _primaryKeyColumns = primaryKeyColumns;
-    _comparisonColumn = comparisonColumn;
-    _hashFunction = hashFunction;
-    _partialUpsertHandler = partialUpsertHandler;
-    _serverMetrics = serverMetrics;
-  }
+  void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager, ServerMetrics serverMetrics);
 
-  public PartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
-    return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
-        k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, 
_primaryKeyColumns, _comparisonColumn,
-            _hashFunction, _partialUpsertHandler, _serverMetrics));
-  }
+  ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId);
 
-  public boolean isPartialUpsertEnabled() {
-    return _partialUpsertHandler != null;
-  }
+  UpsertConfig.Mode getUpsertMode();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
new file mode 100644
index 0000000000..989f5a1e2c
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TableUpsertMetadataManagerFactory {
+  private TableUpsertMetadataManagerFactory() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
+
+  public static TableUpsertMetadataManager create(TableConfig tableConfig, 
Schema schema,
+      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+    String tableNameWithType = tableConfig.getTableName();
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    Preconditions.checkArgument(upsertConfig != null, "Must provide upsert 
config for table: %s", tableNameWithType);
+
+    TableUpsertMetadataManager metadataManager;
+    String metadataManagerClass = upsertConfig.getMetadataManagerClass();
+    if (StringUtils.isNotEmpty(metadataManagerClass)) {
+      LOGGER.info("Creating TableUpsertMetadataManager with class: {} for 
table: {}", metadataManagerClass,
+          tableNameWithType);
+      try {
+        metadataManager =
+            (TableUpsertMetadataManager) 
Class.forName(metadataManagerClass).getConstructor().newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Caught exception constructing 
TableUpsertMetadataManager with class: %s for table: %s",
+                metadataManagerClass, tableNameWithType));
+      }
+    } else {
+      LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table: 
{}", tableNameWithType);
+      metadataManager = new ConcurrentMapTableUpsertMetadataManager();
+    }
+
+    metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics);
+    return metadataManager;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
new file mode 100644
index 0000000000..a7ea1b92f7
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+@SuppressWarnings("rawtypes")
+public class UpsertUtils {
+  private UpsertUtils() {
+  }
+
+  /**
+   * Returns an iterator of {@link RecordInfo} from the segment.
+   */
+  public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment 
segment, List<String> primaryKeyColumns,
+      String comparisonColumn) {
+    int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
+    return new Iterator<RecordInfo>() {
+      private int _docId = 0;
+
+      @Override
+      public boolean hasNext() {
+        return _docId < numTotalDocs;
+      }
+
+      @Override
+      public RecordInfo next() {
+        PrimaryKey primaryKey = new PrimaryKey(new 
Object[primaryKeyColumns.size()]);
+        getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey);
+
+        Object comparisonValue = segment.getValue(_docId, comparisonColumn);
+        if (comparisonValue instanceof byte[]) {
+          comparisonValue = new ByteArray((byte[]) comparisonValue);
+        }
+        return new RecordInfo(primaryKey, _docId++, (Comparable) 
comparisonValue);
+      }
+    };
+  }
+
+  /**
+   * Reads a primary key from the segment.
+   */
+  public static void getPrimaryKey(IndexSegment segment, List<String> 
primaryKeyColumns, int docId, PrimaryKey buffer) {
+    Object[] values = buffer.getValues();
+    int numPrimaryKeyColumns = values.length;
+    for (int i = 0; i < numPrimaryKeyColumns; i++) {
+      Object value = segment.getValue(docId, primaryKeyColumns.get(i));
+      if (value instanceof byte[]) {
+        value = new ByteArray((byte[]) value);
+      }
+      values[i] = value;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index 39f7d6ae98..ca305278ca 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
 import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 08cbf3e91c..a8d8e956c4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -22,10 +22,10 @@ import java.io.File;
 import java.net.URL;
 import java.util.Collections;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
-import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -66,8 +66,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
-        new TableUpsertMetadataManager("testTable_REALTIME", 
_schema.getPrimaryKeyColumns(), "offset",
-            HashFunction.NONE, null, 
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+        TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, 
mock(TableDataManager.class),
+            mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, offsetUpsertConfig, 
"secondsSinceEpoch",
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 015ba5705a..a2439d9f8f 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -22,9 +22,10 @@ import java.io.File;
 import java.net.URL;
 import java.util.Collections;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -64,8 +65,8 @@ public class MutableSegmentImplUpsertTest {
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
-        new TableUpsertMetadataManager("testTable_REALTIME", 
_schema.getPrimaryKeyColumns(), "secondsSinceEpoch",
-            hashFunction, null, 
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+        TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, 
mock(TableDataManager.class),
+            mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, upsertConfigWithHash, 
"secondsSinceEpoch",
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
similarity index 96%
rename from 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
rename to 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index aa1392d6df..5a6e45573c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -26,8 +26,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import 
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
 import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.RecordInfo;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -49,7 +49,7 @@ import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
 
-public class PartitionUpsertMetadataManagerTest {
+public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
@@ -61,9 +61,9 @@ public class PartitionUpsertMetadataManagerTest {
   }
 
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
-    PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Collections.singletonList("pk"), "timeCol",
-            hashFunction, null, mock(ServerMetrics.class));
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
+            "timeCol", hashFunction, null, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -231,9 +231,9 @@ public class PartitionUpsertMetadataManagerTest {
   }
 
   private void verifyAddRecord(HashFunction hashFunction) {
-    PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
Collections.singletonList("pk"), "timeCol",
-            hashFunction, null, mock(ServerMetrics.class));
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, Collections.singletonList("pk"),
+            "timeCol", hashFunction, null, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index f687be7501..162e6b3030 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,12 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Column for upsert comparison, default to time 
column")
   private String _comparisonColumn;
 
+  @JsonPropertyDescription("Custom class for upsert metadata manager")
+  private String _metadataManagerClass;
+
+  @JsonPropertyDescription("Custom configs for upsert metadata manager")
+  private Map<String, String> _metadataManagerConfigs;
+
   @Deprecated
   public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
       @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> 
partialUpsertStrategies,
@@ -105,6 +111,16 @@ public class UpsertConfig extends BaseJsonConfig {
     return _comparisonColumn;
   }
 
+  @Nullable
+  public String getMetadataManagerClass() {
+    return _metadataManagerClass;
+  }
+
+  @Nullable
+  public Map<String, String> getMetadataManagerConfigs() {
+    return _metadataManagerConfigs;
+  }
+
   public void setHashFunction(HashFunction hashFunction) {
     _hashFunction = hashFunction;
   }
@@ -136,4 +152,12 @@ public class UpsertConfig extends BaseJsonConfig {
   public void setComparisonColumn(String comparisonColumn) {
     _comparisonColumn = comparisonColumn;
   }
+
+  public void setMetadataManagerClass(String metadataManagerClass) {
+    _metadataManagerClass = metadataManagerClass;
+  }
+
+  public void setMetadataManagerConfigs(Map<String, String> 
metadataManagerConfigs) {
+    _metadataManagerConfigs = metadataManagerConfigs;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to