noob-se7en commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2256055315
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -796,9 +796,10 @@ private String createNewSegmentMetadata(TableConfig
tableConfig, IdealState idea
// segment. For tables with pauseless mode enabled, this size is
unavailable at this step because the
// segment has not yet been built.
- createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0),
newLLCSegment, newSegmentCreationTimeMs,
+ createNewSegmentZKMetadataWithOffsetAutoReset(tableConfig,
streamConfigs.get(0), newLLCSegment,
+ newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
+ numReplicas, false);
Review Comment:
will suggest to keep the same method without changing its signature.
We can use stream config inside `createNewSegmentZKMetadata` method to
determine the next start offset.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,72 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(
+ String nextOffset, StreamConfig streamConfig, int partitionId) {
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffset offsetAtSLA;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(
+ clientId, partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
5000);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, latestOffset);
+ offsetAtSLA = metadataProvider.getOffsetAtTimestamp(
+ partitionId, System.currentTimeMillis() - timeThreshold * 1000);
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, offsetAtSLA);
+ } catch (Exception e) {
+ LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting
offsets", e);
+ return nextOffset;
+ }
+ try {
+ if (timeThreshold > 0 && offsetAtSLA != null
+ && Long.valueOf(offsetAtSLA.toString()) > Long.valueOf(nextOffset)) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
time threshold reached",
+ nextOffset, latestOffset, partitionId);
+ return latestOffset.toString();
+ }
+ if (offsetThreshold > 0
+ && Long.valueOf(latestOffset.toString()) - Long.valueOf(nextOffset)
> offsetThreshold) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
number of offsets threshold reached",
+ nextOffset, latestOffset, partitionId);
+ return latestOffset.toString();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Not able to convert the offset to LONG type, skip auto
resetting offsets", e);
Review Comment:
```suggestion
LOGGER.warn("Unable to compare the offsets, skip auto resetting
offsets", e);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,72 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(
+ String nextOffset, StreamConfig streamConfig, int partitionId) {
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffset offsetAtSLA;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(
+ clientId, partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
5000);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, latestOffset);
+ offsetAtSLA = metadataProvider.getOffsetAtTimestamp(
+ partitionId, System.currentTimeMillis() - timeThreshold * 1000);
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, offsetAtSLA);
+ } catch (Exception e) {
+ LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting
offsets", e);
+ return nextOffset;
+ }
+ try {
+ if (timeThreshold > 0 && offsetAtSLA != null
+ && Long.valueOf(offsetAtSLA.toString()) > Long.valueOf(nextOffset)) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
time threshold reached",
+ nextOffset, latestOffset, partitionId);
+ return latestOffset.toString();
+ }
+ if (offsetThreshold > 0
+ && Long.valueOf(latestOffset.toString()) - Long.valueOf(nextOffset)
> offsetThreshold) {
Review Comment:
Same use `.compareTo`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,72 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(
+ String nextOffset, StreamConfig streamConfig, int partitionId) {
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ return nextOffset;
+ }
Review Comment:
will suggest to have a boolean flag in stream config to skip offset reset.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,72 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(
+ String nextOffset, StreamConfig streamConfig, int partitionId) {
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffset offsetAtSLA;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(
+ clientId, partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
5000);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, latestOffset);
+ offsetAtSLA = metadataProvider.getOffsetAtTimestamp(
+ partitionId, System.currentTimeMillis() - timeThreshold * 1000);
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
+ streamConfig.getTopicName(), partitionId, offsetAtSLA);
+ } catch (Exception e) {
+ LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting
offsets", e);
+ return nextOffset;
+ }
+ try {
+ if (timeThreshold > 0 && offsetAtSLA != null
+ && Long.valueOf(offsetAtSLA.toString()) > Long.valueOf(nextOffset)) {
Review Comment:
Pls use `.compareTo` to compare offsets
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,72 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(
+ String nextOffset, StreamConfig streamConfig, int partitionId) {
Review Comment:
nit: Let's follow the same lint as existing. Applicable for all change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]