mcvsubbu commented on a change in pull request #6682:
URL: https://github.com/apache/incubator-pinot/pull/6682#discussion_r595315005



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -45,42 +45,63 @@
   private static final int EXTRA_TIME_SECONDS = 120;
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 
/ 10);
-  private static Logger LOGGER = 
LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> 
INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> 
TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new 
ConcurrentHashMap<>();
   private final String _instanceId;
+  private final String _tableNameWithType;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String 
instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
   }
 
-  public static synchronized SegmentBuildTimeLeaseExtender create(final String 
instanceId,
-      ServerMetrics serverMetrics, String tableNameWithType) {
-    SegmentBuildTimeLeaseExtender leaseExtender = 
INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
-    if (leaseExtender != null) {
-      LOGGER.warn("Instance already exists");
-    } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, 
serverMetrics, tableNameWithType);
-      INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
     }
-    return leaseExtender;
+  }
+
+  @VisibleForTesting
+  public static boolean isExecutorShutdown() {
+    return _executor == null;
+  }
+
+  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String 
tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+  }
+
+  public static SegmentBuildTimeLeaseExtender getOrCreate(final String 
instanceId,
+      ServerMetrics serverMetrics, String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> {
+      if (v == null) {
+        return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, 
tableNameWithType);
+      } else {
+        LOGGER.warn("Lease extender for Table: {} already exists", 
tableNameWithType);

Review comment:
       Why is this a warning? The method is to `getOrCreate()`. Getting the 
extender is perfectly ok. Please change it to INFO or no log at all. 

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -45,42 +45,63 @@
   private static final int EXTRA_TIME_SECONDS = 120;
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 
/ 10);
-  private static Logger LOGGER = 
LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> 
INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> 
TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new 
ConcurrentHashMap<>();
   private final String _instanceId;
+  private final String _tableNameWithType;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String 
instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
   }
 
-  public static synchronized SegmentBuildTimeLeaseExtender create(final String 
instanceId,
-      ServerMetrics serverMetrics, String tableNameWithType) {
-    SegmentBuildTimeLeaseExtender leaseExtender = 
INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
-    if (leaseExtender != null) {
-      LOGGER.warn("Instance already exists");
-    } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, 
serverMetrics, tableNameWithType);
-      INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
     }
-    return leaseExtender;
+  }
+
+  @VisibleForTesting
+  public static boolean isExecutorShutdown() {
+    return _executor == null;
+  }
+
+  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String 
tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+  }
+
+  public static SegmentBuildTimeLeaseExtender getOrCreate(final String 
instanceId,
+      ServerMetrics serverMetrics, String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> {
+      if (v == null) {
+        return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, 
tableNameWithType);

Review comment:
       Add a log here "Creating lease extender for the table {}"

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
##########
@@ -91,9 +92,12 @@ public synchronized void init(PinotConfiguration config, 
HelixManager helixManag
       Preconditions.checkState(instanceSegmentTarDir.mkdirs());
     }
 
+    // Initialize segment build time lease extender executor
+    SegmentBuildTimeLeaseExtender.initExecutor();
+    LOGGER.info("Initialized segment build time lease extender executor");

Review comment:
       Move this log to be inside the method initExecutor()

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
##########
@@ -108,6 +112,8 @@ public synchronized void shutDown() {
     for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
       tableDataManager.shutDown();
     }
+    SegmentBuildTimeLeaseExtender.shutdownExecutor();
+    LOGGER.info("Segment build time lease extender executor shut down");

Review comment:
       Move this log to be inside shutDownExecutor()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to