somandal commented on code in PR #17099:
URL: https://github.com/apache/pinot/pull/17099#discussion_r2479017763


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * In-memory cache for tracking reload job status on server side.
+ * Phase 1: Only tracks failure count per job.
+ *
+ * <p>Thread-safe for concurrent access. Uses Guava Cache with LRU eviction
+ * and time-based expiration.
+ */
+@ThreadSafe
+public class ServerReloadJobStatusCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ServerReloadJobStatusCache.class);
+
+  private final Cache<String, ReloadJobStatus> _cache;
+
+  /**
+   * Creates a cache with the given configuration.
+   *
+   */
+  public ServerReloadJobStatusCache() {
+    final ServerReloadJobStatusCacheConfig config = new 
ServerReloadJobStatusCacheConfig();
+    _cache = CacheBuilder.newBuilder()
+        .maximumSize(config.getMaxSize())
+        .expireAfterWrite(config.getTtlDays(), TimeUnit.DAYS)
+        .recordStats()
+        .build();
+
+    LOG.info("Initialized ReloadJobStatusCache with {}", config);
+  }
+
+  /**
+   * Gets the complete job status for the given job.
+   *
+   * @param jobId Reload job ID (UUID)
+   * @return Job status, or null if not found in cache
+   */
+  @Nullable
+  public ReloadJobStatus getJobStatus(String jobId) {
+    requireNonNull(jobId, "jobId cannot be null");
+
+    return _cache.getIfPresent(jobId);
+  }
+
+  /**
+   * Clears all entries from the cache.
+   * Useful for testing.

Review Comment:
   should be comment be removed or this method marked as visible for testing?
   Also, would it make sense to expose an API to clear this cache?



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java:
##########
@@ -22,10 +22,16 @@
 public class SegmentReloadStatusValue {
   private final long _totalSegmentCount;
   private final long _successCount;
+  private final Long _failureCount;

Review Comment:
   can we use `long` here instead of `Long`. there is usually some overhead in 
Java to unbox. You can perhaps initialize this with `-1L`?
   
   Could we perhaps use `0` to indicate that there are actually 0 failures and 
`-1L` to indicate that the servers haven't updated this due to missing 
`reloadJobId`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * In-memory cache for tracking reload job status on server side.
+ * Phase 1: Only tracks failure count per job.
+ *
+ * <p>Thread-safe for concurrent access. Uses Guava Cache with LRU eviction
+ * and time-based expiration.
+ */
+@ThreadSafe
+public class ServerReloadJobStatusCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ServerReloadJobStatusCache.class);
+
+  private final Cache<String, ReloadJobStatus> _cache;
+
+  /**
+   * Creates a cache with the given configuration.
+   *
+   */
+  public ServerReloadJobStatusCache() {
+    final ServerReloadJobStatusCacheConfig config = new 
ServerReloadJobStatusCacheConfig();

Review Comment:
   Can we please ensure that we expose server side configs to change these 
values in case we run into issues? I don't see the setters called for maxSize 
or ttlDays. I would prefer if we add these as part of this PR itself



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Tracks status of a reload job.
+ * Phase 1: Only tracks failure count.

Review Comment:
   what are the other phases we intend to add? it'll be good to create an OSS 
issue with the list of all the phases that we can refer to
   
   are we planning to add stuff like, eventually?:
   ```
           failedSegments.add(segmentName);
           sampleException.set(t);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java:
##########
@@ -117,19 +118,19 @@ void deleteSegment(String tableNameWithType, String 
segmentName)
    * Download happens when local segment's CRC mismatches the one of the 
remote segment; but can also be forced to do
    * regardless of CRC.
    */
-  void reloadSegment(String tableNameWithType, String segmentName, boolean 
forceDownload)
+  void reloadSegment(String tableNameWithType, String segmentName, boolean 
forceDownload, String reloadJobId)
       throws Exception;
 
   /**
    * Reloads all segments of a table.
    */
-  void reloadAllSegments(String tableNameWithType, boolean forceDownload)
+  void reloadAllSegments(String tableNameWithType, boolean forceDownload, 
String reloadJobId)
       throws Exception;
 
   /**
    * Reload a list of segments in a table.
    */
-  void reloadSegments(String tableNameWithType, List<String> segmentNames, 
boolean forceDownload)
+  void reloadSegments(String tableNameWithType, List<String> segmentNames, 
boolean forceDownload, String reloadJobId)

Review Comment:
   nit: only if it makes sense, for these function, should we keep the original 
versions which pass `reloadJobId` as null and add new versions of the API to 
take the `reloadJobId`? This can help reduce some changes where we just pass 
null.



##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java:
##########
@@ -186,9 +187,18 @@ public DimensionTableDataManager benchmark()
 
     String tableName = TABLE_NAME + "_" + _iteration;
     _tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableName);
-    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, SCHEMA,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER,
-        false);
+    _tableDataManager.init(instanceDataManagerConfig,

Review Comment:
   nit: was this formatting change intended? same for other files with similar 
changes. can we revert these?
   
   let's avoid such formatting changes as it makes it harder to review and 
ensure only what needs to be changed has changed. feel free to make separate 
PRs for formatting changes like this if you want



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -78,7 +91,17 @@ public String 
reloadJobStatus(@PathParam("tableNameWithType") String tableNameWi
           successCount++;
         }
       }
-      return JsonUtils.objectToString(new 
SegmentReloadStatusValue(totalSegmentCount, successCount));
+
+      // Query cache for failure count if reloadJobId is provided
+      Long failureCount = null;
+      if (reloadJobId != null) {
+        ReloadJobStatus jobStatus = 
_serverReloadJobStatusCache.getJobStatus(reloadJobId);
+        if (jobStatus != null) {
+          failureCount = (long) jobStatus.getFailureCount();
+        }
+      }
+
+      return JsonUtils.objectToString(new 
SegmentReloadStatusValue(totalSegmentCount, successCount, failureCount));

Review Comment:
   not needed as part of this PR, but just wanted to know if there is any plan 
to add details about `totalSegmentCount` and `successCount` to the cache 
itself? would that help with better accuracy of the stats?
   
   how does this behave if segments are deleted / added since the reload was 
initiated?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java:
##########
@@ -44,17 +45,24 @@
 public interface TableDataManagerProvider {
 
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, 
ServerReloadJobStatusCache reloadJobStatusCache);
 
-  TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
-      SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
+  TableDataManager getTableDataManager(TableConfig tableConfig,
+      Schema schema,
+      SegmentReloadSemaphore segmentRefreshSemaphore,
+      ExecutorService segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      BooleanSupplier isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh);
+      BooleanSupplier isServerReadyToServeQueries,
+      boolean enableAsyncSegmentRefresh,
+      ServerReloadJobStatusCache reloadJobStatusCache);
 
+  /**

Review Comment:
   nit: is this comment really needed? the visible for testing indicates it's 
used in tests, right? if you want to highlight that this shouldn't be used, 
good to add a reason why so it's clearer to maintainers in the future to avoid 
using this



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -44,18 +47,28 @@
 
 @Api(tags = "Tasks")
 @Path("/")
+@Singleton
 public class ControllerJobStatusResource {
 
+  private final ServerInstance _serverInstance;
+  private final ServerReloadJobStatusCache _serverReloadJobStatusCache;
+
   @Inject
-  private ServerInstance _serverInstance;
+  public ControllerJobStatusResource(ServerInstance serverInstance,
+      ServerReloadJobStatusCache serverReloadJobStatusCache) {
+    _serverInstance = serverInstance;
+    _serverReloadJobStatusCache = serverReloadJobStatusCache;
+  }
 
   @GET
   @Path("/controllerJob/reloadStatus/{tableNameWithType}")
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Task status", notes = "Return the status of a given 
reload job")
   public String reloadJobStatus(@PathParam("tableNameWithType") String 
tableNameWithType,
       @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
-      @QueryParam("segmentName") String segmentName, @Context HttpHeaders 
headers)
+      @QueryParam("segmentName") String segmentName,
+      @QueryParam("reloadJobId") String reloadJobId,

Review Comment:
   is my understanding correct that if we don't provide the reloadJobId, then 
it'll behave as before? otherwise it'll check the cache?



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########


Review Comment:
   just for my understanding, is the loadTimeMs() not incremented if the 
segment reload resulted in a failure?



##########
pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java:
##########
@@ -157,7 +158,9 @@ public void setUp()
         CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
     _instanceId = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + hostname + 
"_" + port;
     serverConf.setProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_ID, 
_instanceId);
-    _adminApiApplication = new AdminApiApplication(_serverInstance, new 
AllowAllAccessFactory(), serverConf);
+    _adminApiApplication = new AdminApiApplication(_serverInstance, new 
AllowAllAccessFactory(),
+        mock(ServerReloadJobStatusCache.class),

Review Comment:
   can we test out usage of this cache where we don't mock it in some test? 
ensure that it works as expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to