Jackie-Jiang commented on a change in pull request #6113: URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r502127911
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -224,6 +242,16 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkNotNull(schema); + // TODO(upsert): better checking&hanlding of upsert mode/primary key change Review comment: Move this part into the `init()`, where you can read the table config and schema from the property store passed in. It is weird to reset these variables when adding each segment ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java ########## @@ -58,7 +59,17 @@ public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) { public BaseFilterOperator run() { FilterContext filter = _queryContext.getFilter(); if (filter != null) { - return constructPhysicalOperator(filter, _queryContext.getDebugOptions()); + BaseFilterOperator filterOperator = constructPhysicalOperator(filter, _queryContext.getDebugOptions()); + if (_indexSegment.getValidDocIndex() != null) { + BaseFilterOperator validDocFilter = + new BitmapBasedFilterOperator(_indexSegment.getValidDocIndex().getValidDocBitmap(), false, _numDocs); + return FilterOperatorUtils.getAndFilterOperator(Lists.newArrayList(filterOperator, validDocFilter), _numDocs, Review comment: For better performance ```suggestion return FilterOperatorUtils.getAndFilterOperator(Arrays.asList(filterOperator, validDocFilter), _numDocs, ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/TableUpsertMetadataManager.java ########## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; + +/** + * The manager of the upsert metadata of a table. + */ +@ThreadSafe +public class TableUpsertMetadataManager { + private final Map<Integer, PartitionUpsertMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); + + public TableUpsertMetadataManager() { + } + + public synchronized PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { + if(!_partitionMetadataManagerMap.containsKey(partitionId)) { + _partitionMetadataManagerMap.put(partitionId, new PartitionUpsertMetadataManager(partitionId)); + } + return _partitionMetadataManagerMap.get(partitionId); + } + + public boolean isEmpty() { Review comment: (nit) I don't think this is useful? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/TableUpsertMetadataManager.java ########## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.upsert; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; + +/** + * The manager of the upsert metadata of a table. + */ +@ThreadSafe +public class TableUpsertMetadataManager { + private final Map<Integer, PartitionUpsertMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); + + public TableUpsertMetadataManager() { + } + + public synchronized PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { + if(!_partitionMetadataManagerMap.containsKey(partitionId)) { + _partitionMetadataManagerMap.put(partitionId, new PartitionUpsertMetadataManager(partitionId)); + } + return _partitionMetadataManagerMap.get(partitionId); + } Review comment: ```suggestion public PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, PartitionUpsertMetadataManager::new); } ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) { return canTakeMore; } + private boolean isUpsertEnabled() { + return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE; + } + + private void handleUpsert(GenericRow row, int docId) { + // below are upsert operations + PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); + Object timeValue = row.getValue(_timeColumnName); + Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); + long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + RecordLocation location = new RecordLocation(_segmentName, docId, timestamp); + // check local primary key index first + if (_primaryKeyIndex.containsKey(primaryKey)) { + RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey); + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + _primaryKeyIndex.put(primaryKey, location); + // update validDocIndex + _validDocIndex.remove(prevLocation.getDocId()); + _validDocIndex.checkAndAdd(location.getDocId()); + LOGGER.debug(String + .format("upsert: replace old doc id %d with %d for key: %s, hash: %d", prevLocation.getDocId(), + location.getDocId(), primaryKey, primaryKey.hashCode())); + } else { + LOGGER.debug( + String.format("upsert: ignore a late-arrived record: %s, hash: %d", primaryKey, primaryKey.hashCode())); + } + } else if (_partitionUpsertMetadataManager.containsKey(primaryKey)) { + RecordLocation prevLocation = _partitionUpsertMetadataManager.getRecordLocation(primaryKey); + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + _partitionUpsertMetadataManager.removeRecordLocation(primaryKey); + _primaryKeyIndex.put(primaryKey, location); + + // update validDocIndex + _partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName()) + .remove(prevLocation.getDocId()); + _validDocIndex.checkAndAdd(location.getDocId()); + } + } else { + _primaryKeyIndex.put(primaryKey, location); + _validDocIndex.checkAndAdd(location.getDocId()); + } Review comment: Similarly, this part of the logic should be moved into the `PartitionUpsertMetadataManager` with concurrency control ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java ########## @@ -58,7 +59,17 @@ public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) { public BaseFilterOperator run() { FilterContext filter = _queryContext.getFilter(); if (filter != null) { - return constructPhysicalOperator(filter, _queryContext.getDebugOptions()); + BaseFilterOperator filterOperator = constructPhysicalOperator(filter, _queryContext.getDebugOptions()); + if (_indexSegment.getValidDocIndex() != null) { Review comment: (nit) Cache `_indexSegment.getValidDocIndex()` into a local variable in line 61 ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -266,14 +293,71 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId), - _serverMetrics); + _serverMetrics, _tableUpsertMetadataManager); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); _segmentDataManagerMap.put(segmentName, manager); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } } + private boolean isUpsertEnabled() { + return _upsertMode != null && (_upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL); Review comment: ```suggestion return _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL; ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -266,14 +293,71 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId), - _serverMetrics); + _serverMetrics, _tableUpsertMetadataManager); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); _segmentDataManagerMap.put(segmentName, manager); _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); } } + private boolean isUpsertEnabled() { + return _upsertMode != null && (_upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL); + } + + @Override + public void addSegment(ImmutableSegment immutableSegment) { + if (isUpsertEnabled()) { + handleUpsert(immutableSegment); + } + super.addSegment(immutableSegment); + } + + private void handleUpsert(ImmutableSegment immutableSegment) { + Preconditions.checkArgument(!_primaryKeyColumns.isEmpty(), "the primary key columns cannot be empty"); + Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>(); + for (String primaryKeyColumn : _primaryKeyColumns) { + columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn)); + } + columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName)); + int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs(); + String segmentName = immutableSegment.getSegmentName(); + int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId(); + PartitionUpsertMetadataManager partitionUpsertMetadataManager = + _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId); + for (int docId = 0; docId < numTotalDocs; docId++) { + Object[] fields = new Object[_primaryKeyColumns.size()]; + for (int i = 0; i < _primaryKeyColumns.size(); i++) { + fields[i] = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(docId); + } + PrimaryKey primaryKey = new PrimaryKey(fields); + Object timeValue = columnToReaderMap.get(_timeColumnName).getValue(docId); + Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable"); + long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue); + RecordLocation location = new RecordLocation(segmentName, docId, timestamp); + if (partitionUpsertMetadataManager.containsKey(primaryKey)) { + RecordLocation prevLocation = partitionUpsertMetadataManager.getRecordLocation(primaryKey); + // upsert + if (location.getTimestamp() >= prevLocation.getTimestamp()) { + partitionUpsertMetadataManager.removeRecordLocation(primaryKey); + partitionUpsertMetadataManager.putRecordLocation(primaryKey, location); + partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName()) + .remove(prevLocation.getDocId()); + partitionUpsertMetadataManager.getOrCreateValidDocIndex(segmentName).checkAndAdd(location.getDocId()); + LOGGER.debug(String + .format("upsert: replace old doc id %d with %d for key: %s, hash: %d", prevLocation.getDocId(), + location.getDocId(), primaryKey, primaryKey.hashCode())); + } else { + LOGGER.debug( + String.format("upsert: ignore a late-arrived record: %s, hash: %d", primaryKey, primaryKey.hashCode())); + } + } else { // append + partitionUpsertMetadataManager.putRecordLocation(primaryKey, location); + partitionUpsertMetadataManager.getOrCreateValidDocIndex(segmentName).checkAndAdd(location.getDocId()); + } Review comment: Move this logic into the `PartitionUpsertMetadataManager` class and you need to add concurrency control for it. Currently this part of the code is not thread-safe if 2 segments are updating upsert metadata at the same time ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org