jamesyfshao commented on a change in pull request #6113: URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r502975476
########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { Review comment: we can think about defining an interface and allow the implementation of upsert/append Metadata Manager. This would prevent a lot of null checking/special case handling of upsert related logics ########## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) { return canTakeMore; } + private boolean isUpsertEnabled() { + return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE; + } + + private void handleUpsert(GenericRow row, int docId) { + // below are upsert operations + PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); + Object timeValue = row.getValue(_timeColumnName); + Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); + long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + RecordLocation location = new RecordLocation(_segmentName, docId, timestamp); + // check local primary key index first + if (_primaryKeyIndex.containsKey(primaryKey)) { + RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey); + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + _primaryKeyIndex.put(primaryKey, location); + // update validDocIndex + _validDocIndex.remove(prevLocation.getDocId()); + _validDocIndex.checkAndAdd(location.getDocId()); + LOGGER.debug(String + .format("upsert: replace old doc id %d with %d for key: %s, hash: %d", prevLocation.getDocId(), Review comment: recommend to use {} for string substitute in logging ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final int _partitionId; + + private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex = new ConcurrentHashMap<>(); Review comment: back to my previous point about memory usage, I think maybe you can extract this HashMap to something like an interface wrapping the actual implmentation so we can potentially explore other implementations for primary key mapping ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final int _partitionId; + + private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex = new ConcurrentHashMap<>(); + // the mapping between the (sealed) segment and its validDocuments + private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIndexMap = new ConcurrentHashMap<>(); + + public PartitionUpsertMetadataManager(int partitionId) { + _partitionId = partitionId; + } + + public void removeRecordLocation(PrimaryKey primaryKey) { + _primaryKeyIndex.remove(primaryKey); + } + + public boolean containsKey(PrimaryKey primaryKey) { + return _primaryKeyIndex.containsKey(primaryKey); + } + + public RecordLocation getRecordLocation(PrimaryKey primaryKey) { + return _primaryKeyIndex.get(primaryKey); + } + + public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.get(segmentName); + } + + private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new ThreadSafeMutableRoaringBitmap()); + } + + public synchronized void removeUpsertMetadata(String segmentName) { + _segmentToValidDocIndexMap.remove(segmentName); + for (Map.Entry<PrimaryKey, RecordLocation> entry : new HashSet<>(_primaryKeyIndex.entrySet())) { + if (entry.getValue().getSegmentName().equals(segmentName)) { + _primaryKeyIndex.remove(entry.getKey()); + } + } + } + + int getPartitionId() { + return _partitionId; + } + + public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation location, String segmentName) { Review comment: handleUpsert? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final int _partitionId; + + private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex = new ConcurrentHashMap<>(); + // the mapping between the (sealed) segment and its validDocuments + private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIndexMap = new ConcurrentHashMap<>(); + + public PartitionUpsertMetadataManager(int partitionId) { + _partitionId = partitionId; + } + + public void removeRecordLocation(PrimaryKey primaryKey) { + _primaryKeyIndex.remove(primaryKey); + } + + public boolean containsKey(PrimaryKey primaryKey) { + return _primaryKeyIndex.containsKey(primaryKey); + } + + public RecordLocation getRecordLocation(PrimaryKey primaryKey) { + return _primaryKeyIndex.get(primaryKey); + } + + public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.get(segmentName); + } + + private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new ThreadSafeMutableRoaringBitmap()); + } + + public synchronized void removeUpsertMetadata(String segmentName) { + _segmentToValidDocIndexMap.remove(segmentName); + for (Map.Entry<PrimaryKey, RecordLocation> entry : new HashSet<>(_primaryKeyIndex.entrySet())) { + if (entry.getValue().getSegmentName().equals(segmentName)) { + _primaryKeyIndex.remove(entry.getKey()); + } + } + } + + int getPartitionId() { + return _partitionId; + } + + public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation location, String segmentName) { + if (containsKey(primaryKey)) { Review comment: piggyback on @Jackie-Jiang comment in other class, I think we should abstract all primary key interaction from segment to this class ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ValidDocIndexReader.java ########## @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers; + +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +public interface ValidDocIndexReader { + + /** + * Return the underlying validDoc bitmap (used in query execution) + */ + ImmutableRoaringBitmap getValidDocBitmap(); Review comment: this question might be better addressed in the design doc, but I wonder have we measure the performance & space trade-off of using RoaringBitMap for validDoc? Another alternative will be just using simple bitArray/bytes to store this data, will be interesting to see if we can explain further in the design doc or in codes ########## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) { return canTakeMore; } + private boolean isUpsertEnabled() { + return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE; + } + + private void handleUpsert(GenericRow row, int docId) { + // below are upsert operations + PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); + Object timeValue = row.getValue(_timeColumnName); + Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); + long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + RecordLocation location = new RecordLocation(_segmentName, docId, timestamp); + // check local primary key index first + if (_primaryKeyIndex.containsKey(primaryKey)) { Review comment: It seems that we don't update the record location _partitionUpsertMetadataManager in this if condition, is that any concern for data correctness over here? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final int _partitionId; + + private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex = new ConcurrentHashMap<>(); + // the mapping between the (sealed) segment and its validDocuments + private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIndexMap = new ConcurrentHashMap<>(); + + public PartitionUpsertMetadataManager(int partitionId) { + _partitionId = partitionId; + } + + public void removeRecordLocation(PrimaryKey primaryKey) { + _primaryKeyIndex.remove(primaryKey); + } + + public boolean containsKey(PrimaryKey primaryKey) { + return _primaryKeyIndex.containsKey(primaryKey); + } + + public RecordLocation getRecordLocation(PrimaryKey primaryKey) { + return _primaryKeyIndex.get(primaryKey); + } + + public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.get(segmentName); + } + + private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new ThreadSafeMutableRoaringBitmap()); + } + + public synchronized void removeUpsertMetadata(String segmentName) { + _segmentToValidDocIndexMap.remove(segmentName); + for (Map.Entry<PrimaryKey, RecordLocation> entry : new HashSet<>(_primaryKeyIndex.entrySet())) { + if (entry.getValue().getSegmentName().equals(segmentName)) { + _primaryKeyIndex.remove(entry.getKey()); + } + } + } + + int getPartitionId() { + return _partitionId; + } + + public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation location, String segmentName) { Review comment: handleUpsert()? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages the upsert metadata per partition. + */ +@ThreadSafe +public class PartitionUpsertMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final int _partitionId; + + private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyIndex = new ConcurrentHashMap<>(); + // the mapping between the (sealed) segment and its validDocuments + private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIndexMap = new ConcurrentHashMap<>(); + + public PartitionUpsertMetadataManager(int partitionId) { + _partitionId = partitionId; + } + + public void removeRecordLocation(PrimaryKey primaryKey) { + _primaryKeyIndex.remove(primaryKey); + } + + public boolean containsKey(PrimaryKey primaryKey) { + return _primaryKeyIndex.containsKey(primaryKey); + } + + public RecordLocation getRecordLocation(PrimaryKey primaryKey) { + return _primaryKeyIndex.get(primaryKey); + } + + public ThreadSafeMutableRoaringBitmap getValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.get(segmentName); + } + + private ThreadSafeMutableRoaringBitmap getOrCreateValidDocIndex(String segmentName) { + return _segmentToValidDocIndexMap.computeIfAbsent(segmentName, k->new ThreadSafeMutableRoaringBitmap()); + } + + public synchronized void removeUpsertMetadata(String segmentName) { + _segmentToValidDocIndexMap.remove(segmentName); + for (Map.Entry<PrimaryKey, RecordLocation> entry : new HashSet<>(_primaryKeyIndex.entrySet())) { + if (entry.getValue().getSegmentName().equals(segmentName)) { + _primaryKeyIndex.remove(entry.getKey()); + } + } + } + + int getPartitionId() { + return _partitionId; + } + + public synchronized void handUpsert(PrimaryKey primaryKey, RecordLocation location, String segmentName) { + if (containsKey(primaryKey)) { + RecordLocation prevLocation = getRecordLocation(primaryKey); + // upsert + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + removeRecordLocation(primaryKey); + _primaryKeyIndex.put(primaryKey, location); + getValidDocIndex(prevLocation.getSegmentName()) Review comment: do we need to handle the case where the old DocIndex get deleted because of old segment removal? It might generate null pointer in this case ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org