jamesyfshao commented on a change in pull request #6113:
URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r502975476



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {

Review comment:
       we can think about defining an interface and allow the implementation of 
upsert/append Metadata Manager. This would prevent a lot of null 
checking/special case handling of upsert related logics

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata 
rowMetadata) {
     return canTakeMore;
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE;
+  }
+
+  private void handleUpsert(GenericRow row, int docId) {
+    // below are upsert operations
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = row.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+    RecordLocation location = new RecordLocation(_segmentName, docId, 
timestamp);
+    // check local primary key index first
+    if (_primaryKeyIndex.containsKey(primaryKey)) {
+      RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _primaryKeyIndex.put(primaryKey, location);
+        // update validDocIndex
+        _validDocIndex.remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+        LOGGER.debug(String
+            .format("upsert: replace old doc id %d with %d for key: %s, hash: 
%d", prevLocation.getDocId(),

Review comment:
       recommend to use {} for string substitute in logging

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+
+  private final int _partitionId;
+
+  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex 
= new ConcurrentHashMap<>();

Review comment:
       back to my previous point about memory usage, I think maybe you can 
extract this HashMap to something like an interface wrapping the actual 
implmentation so we can potentially explore other implementations for primary 
key mapping

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+
+  private final int _partitionId;
+
+  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex 
= new ConcurrentHashMap<>();
+  // the mapping between the (sealed) segment and its validDocuments
+  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIndexMap = new ConcurrentHashMap<>();
+
+  public PartitionUpsertMetadataManager(int partitionId) {
+    _partitionId = partitionId;
+  }
+
+  public void removeRecordLocation(PrimaryKey primaryKey) {
+    _primaryKeyIndex.remove(primaryKey);
+  }
+
+  public boolean containsKey(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.containsKey(primaryKey);
+  }
+
+  public RecordLocation getRecordLocation(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.get(primaryKey);
+  }
+
+  public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) {
+    return _segmentToValidDocIndexMap.get(segmentName);
+  }
+
+  private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String 
segmentName) {
+    return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new 
ThreadSafeMutableRoaringBitmap());
+  }
+
+  public synchronized void removeUpsertMetadata(String segmentName) {
+    _segmentToValidDocIndexMap.remove(segmentName);
+    for (Map.Entry<PrimaryKey, RecordLocation> entry : new 
HashSet<>(_primaryKeyIndex.entrySet())) {
+      if (entry.getValue().getSegmentName().equals(segmentName)) {
+        _primaryKeyIndex.remove(entry.getKey());
+      }
+    }
+  }
+
+  int getPartitionId() {
+    return _partitionId;
+  }
+
+  public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation 
location, String segmentName) {

Review comment:
       handleUpsert?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+
+  private final int _partitionId;
+
+  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex 
= new ConcurrentHashMap<>();
+  // the mapping between the (sealed) segment and its validDocuments
+  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIndexMap = new ConcurrentHashMap<>();
+
+  public PartitionUpsertMetadataManager(int partitionId) {
+    _partitionId = partitionId;
+  }
+
+  public void removeRecordLocation(PrimaryKey primaryKey) {
+    _primaryKeyIndex.remove(primaryKey);
+  }
+
+  public boolean containsKey(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.containsKey(primaryKey);
+  }
+
+  public RecordLocation getRecordLocation(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.get(primaryKey);
+  }
+
+  public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) {
+    return _segmentToValidDocIndexMap.get(segmentName);
+  }
+
+  private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String 
segmentName) {
+    return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new 
ThreadSafeMutableRoaringBitmap());
+  }
+
+  public synchronized void removeUpsertMetadata(String segmentName) {
+    _segmentToValidDocIndexMap.remove(segmentName);
+    for (Map.Entry<PrimaryKey, RecordLocation> entry : new 
HashSet<>(_primaryKeyIndex.entrySet())) {
+      if (entry.getValue().getSegmentName().equals(segmentName)) {
+        _primaryKeyIndex.remove(entry.getKey());
+      }
+    }
+  }
+
+  int getPartitionId() {
+    return _partitionId;
+  }
+
+  public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation 
location, String segmentName) {
+    if (containsKey(primaryKey)) {

Review comment:
       piggyback on @Jackie-Jiang comment in other class, I think we should 
abstract all primary key interaction from segment to this class 

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ValidDocIndexReader.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+public interface ValidDocIndexReader {
+
+  /**
+   * Return the underlying validDoc bitmap (used in query execution)
+   */
+  ImmutableRoaringBitmap getValidDocBitmap();

Review comment:
       this question might be better addressed in the design doc, but I wonder 
have we measure the performance & space trade-off of using RoaringBitMap for 
validDoc? Another alternative will be just using simple bitArray/bytes to store 
this data, will be interesting to see if we can explain further in the design 
doc or in codes

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata 
rowMetadata) {
     return canTakeMore;
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE;
+  }
+
+  private void handleUpsert(GenericRow row, int docId) {
+    // below are upsert operations
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = row.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column 
shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+    RecordLocation location = new RecordLocation(_segmentName, docId, 
timestamp);
+    // check local primary key index first
+    if (_primaryKeyIndex.containsKey(primaryKey)) {

Review comment:
       It seems that we don't update the record location 
_partitionUpsertMetadataManager in this if condition, is that any concern for 
data correctness over here?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+
+  private final int _partitionId;
+
+  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex 
= new ConcurrentHashMap<>();
+  // the mapping between the (sealed) segment and its validDocuments
+  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIndexMap = new ConcurrentHashMap<>();
+
+  public PartitionUpsertMetadataManager(int partitionId) {
+    _partitionId = partitionId;
+  }
+
+  public void removeRecordLocation(PrimaryKey primaryKey) {
+    _primaryKeyIndex.remove(primaryKey);
+  }
+
+  public boolean containsKey(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.containsKey(primaryKey);
+  }
+
+  public RecordLocation getRecordLocation(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.get(primaryKey);
+  }
+
+  public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) {
+    return _segmentToValidDocIndexMap.get(segmentName);
+  }
+
+  private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String 
segmentName) {
+    return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new 
ThreadSafeMutableRoaringBitmap());
+  }
+
+  public synchronized void removeUpsertMetadata(String segmentName) {
+    _segmentToValidDocIndexMap.remove(segmentName);
+    for (Map.Entry<PrimaryKey, RecordLocation> entry : new 
HashSet<>(_primaryKeyIndex.entrySet())) {
+      if (entry.getValue().getSegmentName().equals(segmentName)) {
+        _primaryKeyIndex.remove(entry.getKey());
+      }
+    }
+  }
+
+  int getPartitionId() {
+    return _partitionId;
+  }
+
+  public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation 
location, String segmentName) {

Review comment:
       handleUpsert()?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages the upsert metadata per partition.
+ */
+@ThreadSafe
+public class PartitionUpsertMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+
+  private final int _partitionId;
+
+  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex 
= new ConcurrentHashMap<>();
+  // the mapping between the (sealed) segment and its validDocuments
+  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> 
_segmentToValidDocIndexMap = new ConcurrentHashMap<>();
+
+  public PartitionUpsertMetadataManager(int partitionId) {
+    _partitionId = partitionId;
+  }
+
+  public void removeRecordLocation(PrimaryKey primaryKey) {
+    _primaryKeyIndex.remove(primaryKey);
+  }
+
+  public boolean containsKey(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.containsKey(primaryKey);
+  }
+
+  public RecordLocation getRecordLocation(PrimaryKey primaryKey) {
+    return _primaryKeyIndex.get(primaryKey);
+  }
+
+  public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) {
+    return _segmentToValidDocIndexMap.get(segmentName);
+  }
+
+  private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String 
segmentName) {
+    return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new 
ThreadSafeMutableRoaringBitmap());
+  }
+
+  public synchronized void removeUpsertMetadata(String segmentName) {
+    _segmentToValidDocIndexMap.remove(segmentName);
+    for (Map.Entry<PrimaryKey, RecordLocation> entry : new 
HashSet<>(_primaryKeyIndex.entrySet())) {
+      if (entry.getValue().getSegmentName().equals(segmentName)) {
+        _primaryKeyIndex.remove(entry.getKey());
+      }
+    }
+  }
+
+  int getPartitionId() {
+    return _partitionId;
+  }
+
+  public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation 
location, String segmentName) {
+    if (containsKey(primaryKey)) {
+      RecordLocation prevLocation = getRecordLocation(primaryKey);
+      // upsert
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        removeRecordLocation(primaryKey);
+        _primaryKeyIndex.put(primaryKey, location);
+        getValidDocIndex(prevLocation.getSegmentName())

Review comment:
       do we need to handle the case where the old DocIndex get deleted because 
of old segment removal? It might generate null pointer in this case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to