mcvsubbu commented on a change in pull request #6682: URL: https://github.com/apache/incubator-pinot/pull/6682#discussion_r595315005
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java ########## @@ -45,42 +45,63 @@ private static final int EXTRA_TIME_SECONDS = 120; // Retransmit lease request 10% before lease expires. private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10); - private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); - private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); + private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>(); + private static ScheduledExecutorService _executor; - private ScheduledExecutorService _executor; private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>(); private final String _instanceId; + private final String _tableNameWithType; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) { - return INSTANCE_TO_LEASE_EXTENDER.get(instanceId); + public static void initExecutor() { + _executor = new ScheduledThreadPoolExecutor(1); } - public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId, - ServerMetrics serverMetrics, String tableNameWithType) { - SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId); - if (leaseExtender != null) { - LOGGER.warn("Instance already exists"); - } else { - leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); - INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender); + public static void shutdownExecutor() { + if (_executor != null) { + _executor.shutdownNow(); + _executor = null; } - return leaseExtender; + } + + @VisibleForTesting + public static boolean isExecutorShutdown() { + return _executor == null; + } + + public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType); + } + + public static SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId, + ServerMetrics serverMetrics, String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> { + if (v == null) { + return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); + } else { + LOGGER.warn("Lease extender for Table: {} already exists", tableNameWithType); Review comment: Why is this a warning? The method is to `getOrCreate()`. Getting the extender is perfectly ok. Please change it to INFO or no log at all. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java ########## @@ -45,42 +45,63 @@ private static final int EXTRA_TIME_SECONDS = 120; // Retransmit lease request 10% before lease expires. private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10); - private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); - private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); + private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>(); + private static ScheduledExecutorService _executor; - private ScheduledExecutorService _executor; private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>(); private final String _instanceId; + private final String _tableNameWithType; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) { - return INSTANCE_TO_LEASE_EXTENDER.get(instanceId); + public static void initExecutor() { + _executor = new ScheduledThreadPoolExecutor(1); } - public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId, - ServerMetrics serverMetrics, String tableNameWithType) { - SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId); - if (leaseExtender != null) { - LOGGER.warn("Instance already exists"); - } else { - leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); - INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender); + public static void shutdownExecutor() { + if (_executor != null) { + _executor.shutdownNow(); + _executor = null; } - return leaseExtender; + } + + @VisibleForTesting + public static boolean isExecutorShutdown() { + return _executor == null; + } + + public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType); + } + + public static SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId, + ServerMetrics serverMetrics, String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> { + if (v == null) { + return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); Review comment: Add a log here "Creating lease extender for the table {}" ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java ########## @@ -91,9 +92,12 @@ public synchronized void init(PinotConfiguration config, HelixManager helixManag Preconditions.checkState(instanceSegmentTarDir.mkdirs()); } + // Initialize segment build time lease extender executor + SegmentBuildTimeLeaseExtender.initExecutor(); + LOGGER.info("Initialized segment build time lease extender executor"); Review comment: Move this log to be inside the method initExecutor() ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java ########## @@ -108,6 +112,8 @@ public synchronized void shutDown() { for (TableDataManager tableDataManager : _tableDataManagerMap.values()) { tableDataManager.shutDown(); } + SegmentBuildTimeLeaseExtender.shutdownExecutor(); + LOGGER.info("Segment build time lease extender executor shut down"); Review comment: Move this log to be inside shutDownExecutor() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org