Jackie-Jiang commented on code in PR #8708: URL: https://github.com/apache/pinot/pull/8708#discussion_r882111038
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableState.java: ########## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.tablestate; + +import java.util.Map; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TableState { Review Comment: Suggest modeling it as a util class (`TableStateUtils`) and have one static method `public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType)`. The `_allSegmentsLoaded` can still be tracked within the metadata manager. We don't want this util class to track the loaded flag, instead it should always re-calculate the state. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -538,6 +539,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS } for (GenericRow transformedRow : reusedResult.getTransformedRows()) { try { + // TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount? Review Comment: IMO it is okay to increase the value since we are just tracking the row count fed into the `index()`. We should use another metrics to track the rows ignored because of the dedup ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -111,7 +116,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30; private UpsertConfig.Mode _upsertMode; + private boolean _isDedupEnabled; Review Comment: This flag is redundant, and is implicit on the presence of `_tableDedupMetadataManager` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java: ########## @@ -288,8 +303,8 @@ public void setTierConfigsList(List<TierConfig> tierConfigsList) { } @JsonIgnore - public UpsertConfig.HashFunction getHashFunction() { - return _upsertConfig == null ? UpsertConfig.HashFunction.NONE : _upsertConfig.getHashFunction(); + public HashFunction getHashFunction() { Review Comment: We should remove this method. The hash function can come from both upsert config and dedup config ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.dedup; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; +import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.utils.HashUtils; +import org.apache.pinot.segment.local.utils.RecordInfo; +import org.apache.pinot.segment.local.utils.tablestate.TableState; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PartitionDedupMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + + private final String _tableNameWithType; + private final TableState _tableState; + private final List<String> _primaryKeyColumns; + private final int _partitionId; + private final ServerMetrics _serverMetrics; + private final HashFunction _hashFunction; + + // TODO(saurabh) : We can replace this with a ocncurrent Set + @VisibleForTesting + final ConcurrentHashMap<Object, Boolean> _primaryKeySet = new ConcurrentHashMap<>(); + + public PartitionDedupMetadataManager(String tableNameWithType, TableState tableState, List<String> primaryKeyColumns, + int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) { + _tableNameWithType = tableNameWithType; + _tableState = tableState; + _primaryKeyColumns = primaryKeyColumns; + _partitionId = partitionId; + _serverMetrics = serverMetrics; + _hashFunction = hashFunction; + } + + public void addSegment(IndexSegment segment) { + // Add all PKs to _primaryKeySet + Iterator<RecordInfo> recordInfoIterator = getRecordInfoIterator(segment); + while (recordInfoIterator.hasNext()) { + RecordInfo recordInfo = recordInfoIterator.next(); + _primaryKeySet.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), true); + } + } + + public void removeSegment(IndexSegment segment) { Review Comment: (MAJOR) We cannot always remove the PKs for destroyed segment because it can mess up the metadata when a segment is reloaded (we will first add the re-loaded segment, then remove the previous segment). We need to figure out a solution for this. One solution I can think of is to track the segment reference when `addSegment()` is called, and only remove the PK when the reference is pointing to the segment to be removed ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1307,6 +1310,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns(); _fstIndexColumns = new ArrayList<>(fstIndexColumns); + boolean dedupEnabled = (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()); Review Comment: This flag is redundant. It is implicit on the presence of `partitionDedupMetadataManager` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -505,34 +505,40 @@ static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { * - comparison column exists */ @VisibleForTesting - static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { - if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE && tableConfig.getDedupConfig() == null) { return; } // check table type is realtime Preconditions - .checkState(tableConfig.getTableType() == TableType.REALTIME, "Upsert table is for realtime table only."); + .checkState(tableConfig.getTableType() == TableType.REALTIME, "Upsert/Dedup table is for realtime table only."); // primary key exists Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()), - "Upsert table must have primary key columns in the schema"); + "Upsert/Dedup table must have primary key columns in the schema"); // consumer type must be low-level Map<String, String> streamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigsMap); Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(), - "Upsert table must use low-level streaming consumer type"); + "Upsert/Dedup table must use low-level streaming consumer type"); // replica group is configured for routing Preconditions.checkState( tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()), - "Upsert table must use strict replica-group (i.e. strictReplicaGroup) based routing"); - // no startree index - Preconditions.checkState( - CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs()) && !tableConfig - .getIndexingConfig().isEnableDefaultStarTree(), "The upsert table cannot have star-tree index."); - // comparison column exists - if (tableConfig.getUpsertConfig().getComparisonColumn() != null) { - String comparisonCol = tableConfig.getUpsertConfig().getComparisonColumn(); - Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema"); + "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); Review Comment: We should also validate that upsert and dedup cannot co-exist ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -505,34 +505,40 @@ static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { * - comparison column exists */ @VisibleForTesting - static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { - if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE && tableConfig.getDedupConfig() == null) { return; } // check table type is realtime Preconditions - .checkState(tableConfig.getTableType() == TableType.REALTIME, "Upsert table is for realtime table only."); + .checkState(tableConfig.getTableType() == TableType.REALTIME, "Upsert/Dedup table is for realtime table only."); // primary key exists Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()), - "Upsert table must have primary key columns in the schema"); + "Upsert/Dedup table must have primary key columns in the schema"); // consumer type must be low-level Map<String, String> streamConfigsMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigsMap); Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(), - "Upsert table must use low-level streaming consumer type"); + "Upsert/Dedup table must use low-level streaming consumer type"); // replica group is configured for routing Preconditions.checkState( tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()), - "Upsert table must use strict replica-group (i.e. strictReplicaGroup) based routing"); - // no startree index - Preconditions.checkState( - CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs()) && !tableConfig - .getIndexingConfig().isEnableDefaultStarTree(), "The upsert table cannot have star-tree index."); - // comparison column exists - if (tableConfig.getUpsertConfig().getComparisonColumn() != null) { - String comparisonCol = tableConfig.getUpsertConfig().getComparisonColumn(); - Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema"); + "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); + + // specifically for upsert + if (tableConfig.getUpsertConfig() != null) { Review Comment: Non-null config doesn't mean it is enabled ```suggestion if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -505,34 +505,40 @@ static void validateTaskConfigs(TableConfig tableConfig, Schema schema) { * - comparison column exists */ @VisibleForTesting - static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { - if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE && tableConfig.getDedupConfig() == null) { Review Comment: Non-null dedup config doesn't mean it is enabled. We either remove the `dedupEnabled` field and treat non-null dedup as dedup-enabled, or check the flag. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -364,9 +384,27 @@ public void addSegment(ImmutableSegment immutableSegment) { if (isUpsertEnabled()) { handleUpsert((ImmutableSegmentImpl) immutableSegment); } + + if (_isDedupEnabled) { + buildDedupeMeta((ImmutableSegmentImpl) immutableSegment); + } super.addSegment(immutableSegment); } + private void buildDedupeMeta(ImmutableSegmentImpl immutableSegment) { Review Comment: (nit) ```suggestion private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) { ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -364,9 +384,27 @@ public void addSegment(ImmutableSegment immutableSegment) { if (isUpsertEnabled()) { handleUpsert((ImmutableSegmentImpl) immutableSegment); } + + if (_isDedupEnabled) { + buildDedupeMeta((ImmutableSegmentImpl) immutableSegment); + } super.addSegment(immutableSegment); } + private void buildDedupeMeta(ImmutableSegmentImpl immutableSegment) { + // TODO(saurabh) refactor commons code with handleUpsert + String segmentName = immutableSegment.getSegmentName(); + Integer partitionGroupId = SegmentUtils + .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); + Preconditions.checkNotNull(partitionGroupId, String + .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, Review Comment: ```suggestion .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java: ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.pinot.spi.config.BaseJsonConfig; + +public class DedupConfig extends BaseJsonConfig { + private final boolean _dedupEnabled; + private final HashFunction _hashFunction; + + @JsonCreator + public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) final boolean dedupEnabled, Review Comment: (minor) We don't usually put `final` for local variables or parameters -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org