noob-se7en commented on code in PR #16783:
URL: https://github.com/apache/pinot/pull/16783#discussion_r2375684666
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -82,147 +87,221 @@
* | TimerTrackingTask | (CONSUMING -> DROPPED state change)
* |___________________|
*
- * TODO: handle bug situations like the one where a partition is not allocated
to a given server due to a bug.
*/
public class IngestionDelayTracker {
private static class IngestionInfo {
- final long _ingestionTimeMs;
- final long _firstStreamIngestionTimeMs;
- final StreamPartitionMsgOffset _currentOffset;
- final StreamPartitionMsgOffset _latestOffset;
+ volatile long _ingestionTimeMs;
+ volatile StreamPartitionMsgOffset _currentOffset;
+
+ IngestionInfo(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset) {
+ _ingestionTimeMs = ingestionTimeMs;
+ _currentOffset = currentOffset;
+ }
- IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
- @Nullable StreamPartitionMsgOffset currentOffset, @Nullable
StreamPartitionMsgOffset latestOffset) {
+ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset) {
_ingestionTimeMs = ingestionTimeMs;
- _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
- _latestOffset = latestOffset;
}
}
private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionDelayTracker.class);
-
// Sleep interval for scheduled executor service thread that triggers read
of ideal state
- private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS =
300000; // 5 minutes +/- precision in timeouts
+ // 5 minutes +/- precision in timeouts
+ private static final long METRICS_CLEANUP_INTERVAL_MS =
TimeUnit.MINUTES.toMillis(5);
+ // 30 seconds +/- precision in timeouts
+ private static final long METRICS_TRACKING_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(30);
// Once a partition is marked for verification, we wait 10 minutes to pull
its ideal state.
- private static final int PARTITION_TIMEOUT_MS = 600000; // 10
minutes timeouts
+ private static final long PARTITION_TIMEOUT_MS =
TimeUnit.MINUTES.toMillis(10);
// Delay scheduled executor service for this amount of time after starting
service
private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
-
// Cache expire time for ignored segment if there is no update from the
segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
-
- // Per partition info for all partitions active for the current table.
- private final Map<Integer, IngestionInfo> _ingestionInfoMap = new
ConcurrentHashMap<>();
-
- // We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
- // go back to CONSUMING in some period of time, we verify whether they are
still hosted in this server by reading
- // ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
- // TODO: Consider removing this mechanism after releasing 1.2.0, and use
{@link #stopTrackingPartitionIngestionDelay}
- // instead.
- private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
-
- private final Cache<String, Boolean> _segmentsToIgnore =
-
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
-
- // TODO: Make thread pool a server/cluster level config
- // ScheduledExecutorService to check partitions that are inactive against
ideal state.
- private final ScheduledExecutorService _scheduledExecutor =
Executors.newScheduledThreadPool(2);
+ // Timeout after 5 seconds while fetching the latest stream offset.
+ private static final long LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(5);
private final ServerMetrics _serverMetrics;
private final String _tableNameWithType;
private final String _metricName;
-
private final RealtimeTableDataManager _realTimeTableDataManager;
private final Supplier<Boolean> _isServerReadyToServeQueries;
+ private final Cache<String, Boolean> _segmentsToIgnore =
+
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
+ // Map to describe the partitions for which the metrics are being reported.
+ private final Map<Integer, Boolean> _partitionsTracked = new
ConcurrentHashMap<>();
+ // Map to hold the ingestion info reported by the consumer.
+ private final Map<Integer, IngestionInfo> _ingestionInfoMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
+ private Clock _clock = Clock.systemUTC();
+ private final ScheduledExecutorService _metricsCleanupScheduler;
+ private final ScheduledExecutorService _ingestionDelayTrackingScheduler;
- private Clock _clock;
+ protected volatile Map<Integer, StreamPartitionMsgOffset>
_partitionIdToLatestOffset;
+ // List of StreamMetadataProvider to fetch upstream latest stream offset
(List because table can have multiple
+ // upstream topics)
+ protected List<StreamMetadataProvider> _streamMetadataProviderList = new
ArrayList<>();
Review Comment:
Good point and I also gave it a thought before.
I initially had planned to address this later but I think it was easy to
update the current implementation and I have updated the
`createStreamMetadataProvider` to `createOrUpdateStreamMetadataProvider`.
Whenever we fetch the partitionsHosted, we also check if
streamMetadataProvider for these partitions exists.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]